Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / public.py @ efb1f3d3

History | View | Annotate | Download (26 kB)

1 bc4f25c0 Sofia Papagiannaki
# Copyright 2011 GRNET S.A. All rights reserved.
2 bc4f25c0 Sofia Papagiannaki
#
3 bc4f25c0 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
4 bc4f25c0 Sofia Papagiannaki
# without modification, are permitted provided that the following
5 bc4f25c0 Sofia Papagiannaki
# conditions are met:
6 bc4f25c0 Sofia Papagiannaki
#
7 bc4f25c0 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
8 bc4f25c0 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
9 bc4f25c0 Sofia Papagiannaki
#      disclaimer.
10 bc4f25c0 Sofia Papagiannaki
#
11 bc4f25c0 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
12 bc4f25c0 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
13 bc4f25c0 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
14 bc4f25c0 Sofia Papagiannaki
#      provided with the distribution.
15 bc4f25c0 Sofia Papagiannaki
#
16 bc4f25c0 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 bc4f25c0 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 bc4f25c0 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 bc4f25c0 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 bc4f25c0 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 bc4f25c0 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 bc4f25c0 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 bc4f25c0 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 bc4f25c0 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 bc4f25c0 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 bc4f25c0 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 bc4f25c0 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
28 bc4f25c0 Sofia Papagiannaki
#
29 bc4f25c0 Sofia Papagiannaki
# The views and conclusions contained in the software and
30 bc4f25c0 Sofia Papagiannaki
# documentation are those of the authors and should not be
31 bc4f25c0 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
32 bc4f25c0 Sofia Papagiannaki
# or implied, of GRNET S.A.
33 bc4f25c0 Sofia Papagiannaki
34 bc4f25c0 Sofia Papagiannaki
import random
35 bc4f25c0 Sofia Papagiannaki
import datetime
36 bc4f25c0 Sofia Papagiannaki
import time as _time
37 efb1f3d3 Sofia Papagiannaki
import re
38 efb1f3d3 Sofia Papagiannaki
39 efb1f3d3 Sofia Papagiannaki
from functools import partial
40 bc4f25c0 Sofia Papagiannaki
41 bc4f25c0 Sofia Papagiannaki
from synnefo.lib import join_urls
42 bc4f25c0 Sofia Papagiannaki
43 bc4f25c0 Sofia Papagiannaki
import django.utils.simplejson as json
44 bc4f25c0 Sofia Papagiannaki
45 efb1f3d3 Sofia Papagiannaki
from pithos.api.test import (PithosAPITest, DATE_FORMATS, TEST_BLOCK_SIZE,
46 efb1f3d3 Sofia Papagiannaki
                             TEST_HASH_ALGORITHM)
47 efb1f3d3 Sofia Papagiannaki
48 efb1f3d3 Sofia Papagiannaki
from pithos.api.test.util import (get_random_name, get_random_data, md5_hash,
49 efb1f3d3 Sofia Papagiannaki
                                  merkle)
50 bc4f25c0 Sofia Papagiannaki
from pithos.api import settings as pithos_settings
51 bc4f25c0 Sofia Papagiannaki
52 efb1f3d3 Sofia Papagiannaki
merkle = partial(merkle,
53 efb1f3d3 Sofia Papagiannaki
                 blocksize=TEST_BLOCK_SIZE,
54 efb1f3d3 Sofia Papagiannaki
                 blockhash=TEST_HASH_ALGORITHM)
55 efb1f3d3 Sofia Papagiannaki
56 bc4f25c0 Sofia Papagiannaki
57 bc4f25c0 Sofia Papagiannaki
class TestPublic(PithosAPITest):
58 bc4f25c0 Sofia Papagiannaki
    def _assert_not_public_object(self, cname, oname):
59 bc4f25c0 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
60 bc4f25c0 Sofia Papagiannaki
        self.assertTrue('X-Object-Public' not in info)
61 bc4f25c0 Sofia Papagiannaki
62 bc4f25c0 Sofia Papagiannaki
    def _assert_public_object(self, cname, oname, odata):
63 bc4f25c0 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
64 bc4f25c0 Sofia Papagiannaki
        self.assertTrue('X-Object-Public' in info)
65 bc4f25c0 Sofia Papagiannaki
        public = info['X-Object-Public']
66 bc4f25c0 Sofia Papagiannaki
67 bc4f25c0 Sofia Papagiannaki
        self.assertTrue(len(public) >= pithos_settings.PUBLIC_URL_SECURITY)
68 bc4f25c0 Sofia Papagiannaki
        (self.assertTrue(l in pithos_settings.PUBLIC_URL_ALPHABET) for
69 bc4f25c0 Sofia Papagiannaki
         l in public)
70 bc4f25c0 Sofia Papagiannaki
71 f53483d8 Sofia Papagiannaki
        r = self.get(public, user='user2', token=None)
72 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
73 bc4f25c0 Sofia Papagiannaki
        self.assertTrue('X-Object-Public' not in r)
74 bc4f25c0 Sofia Papagiannaki
75 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.content, odata)
76 bc4f25c0 Sofia Papagiannaki
77 bc4f25c0 Sofia Papagiannaki
        # assert other users cannot access the object using the priavate path
78 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
79 bc4f25c0 Sofia Papagiannaki
        r = self.head(url, user='user2')
80 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
81 bc4f25c0 Sofia Papagiannaki
82 bc4f25c0 Sofia Papagiannaki
        r = self.get(url, user='user2')
83 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
84 bc4f25c0 Sofia Papagiannaki
85 bc4f25c0 Sofia Papagiannaki
        return public
86 bc4f25c0 Sofia Papagiannaki
87 bc4f25c0 Sofia Papagiannaki
    def test_set_object_public(self):
88 bc4f25c0 Sofia Papagiannaki
        cname = get_random_name()
89 bc4f25c0 Sofia Papagiannaki
        self.create_container(cname)
90 bc4f25c0 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
91 bc4f25c0 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
92 bc4f25c0 Sofia Papagiannaki
93 bc4f25c0 Sofia Papagiannaki
        # set public
94 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
95 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
96 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
97 bc4f25c0 Sofia Papagiannaki
98 bc4f25c0 Sofia Papagiannaki
        self._assert_public_object(cname, oname, odata)
99 bc4f25c0 Sofia Papagiannaki
100 bc4f25c0 Sofia Papagiannaki
    def test_set_twice(self):
101 bc4f25c0 Sofia Papagiannaki
        cname = get_random_name()
102 bc4f25c0 Sofia Papagiannaki
        self.create_container(cname)
103 bc4f25c0 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
104 bc4f25c0 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
105 bc4f25c0 Sofia Papagiannaki
106 bc4f25c0 Sofia Papagiannaki
        # set public
107 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
108 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
109 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
110 bc4f25c0 Sofia Papagiannaki
111 bc4f25c0 Sofia Papagiannaki
        public = self._assert_public_object(cname, oname, odata)
112 bc4f25c0 Sofia Papagiannaki
113 bc4f25c0 Sofia Papagiannaki
        # set public
114 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
115 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
116 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
117 bc4f25c0 Sofia Papagiannaki
118 bc4f25c0 Sofia Papagiannaki
        public2 = self._assert_public_object(cname, oname, odata)
119 bc4f25c0 Sofia Papagiannaki
120 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(public, public2)
121 bc4f25c0 Sofia Papagiannaki
122 bc4f25c0 Sofia Papagiannaki
    def test_set_unset_set(self):
123 bc4f25c0 Sofia Papagiannaki
        cname = get_random_name()
124 bc4f25c0 Sofia Papagiannaki
        self.create_container(cname)
125 bc4f25c0 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
126 bc4f25c0 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
127 bc4f25c0 Sofia Papagiannaki
128 bc4f25c0 Sofia Papagiannaki
        # set public
129 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
130 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
131 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
132 bc4f25c0 Sofia Papagiannaki
133 bc4f25c0 Sofia Papagiannaki
        public = self._assert_public_object(cname, oname, odata)
134 bc4f25c0 Sofia Papagiannaki
135 bc4f25c0 Sofia Papagiannaki
        # unset public
136 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
137 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false')
138 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
139 bc4f25c0 Sofia Papagiannaki
140 bc4f25c0 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
141 bc4f25c0 Sofia Papagiannaki
142 bc4f25c0 Sofia Papagiannaki
        # set public
143 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
144 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
145 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
146 bc4f25c0 Sofia Papagiannaki
147 bc4f25c0 Sofia Papagiannaki
        public2 = self._assert_public_object(cname, oname, odata)
148 bc4f25c0 Sofia Papagiannaki
149 bc4f25c0 Sofia Papagiannaki
        self.assertTrue(public != public2)
150 bc4f25c0 Sofia Papagiannaki
151 bc4f25c0 Sofia Papagiannaki
        # unset public
152 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
153 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false')
154 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
155 bc4f25c0 Sofia Papagiannaki
156 bc4f25c0 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
157 bc4f25c0 Sofia Papagiannaki
158 bc4f25c0 Sofia Papagiannaki
    def test_update_public_object(self):
159 bc4f25c0 Sofia Papagiannaki
        cname = get_random_name()
160 bc4f25c0 Sofia Papagiannaki
        self.create_container(cname)
161 bc4f25c0 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
162 bc4f25c0 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
163 bc4f25c0 Sofia Papagiannaki
164 bc4f25c0 Sofia Papagiannaki
        # set public
165 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
166 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
167 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
168 bc4f25c0 Sofia Papagiannaki
169 bc4f25c0 Sofia Papagiannaki
        public = self._assert_public_object(cname, oname, odata)
170 bc4f25c0 Sofia Papagiannaki
171 bc4f25c0 Sofia Papagiannaki
        odata2 = self.append_object_data(cname, oname)[1]
172 bc4f25c0 Sofia Papagiannaki
173 bc4f25c0 Sofia Papagiannaki
        public2 = self._assert_public_object(cname, oname, odata + odata2)
174 bc4f25c0 Sofia Papagiannaki
175 bc4f25c0 Sofia Papagiannaki
        self.assertTrue(public == public2)
176 bc4f25c0 Sofia Papagiannaki
177 bc4f25c0 Sofia Papagiannaki
    def test_delete_public_object(self):
178 bc4f25c0 Sofia Papagiannaki
        cname = get_random_name()
179 bc4f25c0 Sofia Papagiannaki
        self.create_container(cname)
180 bc4f25c0 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
181 bc4f25c0 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
182 bc4f25c0 Sofia Papagiannaki
183 bc4f25c0 Sofia Papagiannaki
        # set public
184 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
185 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
186 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
187 bc4f25c0 Sofia Papagiannaki
        public = self._assert_public_object(cname, oname, odata)
188 bc4f25c0 Sofia Papagiannaki
189 bc4f25c0 Sofia Papagiannaki
        # delete object
190 bc4f25c0 Sofia Papagiannaki
        r = self.delete(url)
191 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
192 bc4f25c0 Sofia Papagiannaki
        r = self.get(url)
193 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
194 bc4f25c0 Sofia Papagiannaki
        r = self.get(public)
195 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
196 bc4f25c0 Sofia Papagiannaki
197 bc4f25c0 Sofia Papagiannaki
    def test_delete_public_object_history(self):
198 bc4f25c0 Sofia Papagiannaki
        cname = get_random_name()
199 bc4f25c0 Sofia Papagiannaki
        self.create_container(cname)
200 bc4f25c0 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
201 bc4f25c0 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
202 bc4f25c0 Sofia Papagiannaki
203 bc4f25c0 Sofia Papagiannaki
        # set public
204 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
205 bc4f25c0 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
206 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
207 bc4f25c0 Sofia Papagiannaki
        public = self._assert_public_object(cname, oname, odata)
208 bc4f25c0 Sofia Papagiannaki
209 bc4f25c0 Sofia Papagiannaki
        for _ in range(random.randint(1, 10)):
210 bc4f25c0 Sofia Papagiannaki
            odata += self.append_object_data(cname, oname)[1]
211 bc4f25c0 Sofia Papagiannaki
            _time.sleep(1)
212 bc4f25c0 Sofia Papagiannaki
213 bc4f25c0 Sofia Papagiannaki
        # get object versions
214 bc4f25c0 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
215 bc4f25c0 Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
216 bc4f25c0 Sofia Papagiannaki
        version_list = json.loads(r.content)['versions']
217 f5bbcb5d Sofia Papagiannaki
        mtime = [int(float(t[1])) for t in version_list]
218 bc4f25c0 Sofia Papagiannaki
219 bc4f25c0 Sofia Papagiannaki
        # delete object history
220 bc4f25c0 Sofia Papagiannaki
        i = random.randrange(len(mtime))
221 bc4f25c0 Sofia Papagiannaki
        self.delete('%s?until=%d' % (url, mtime[i]))
222 bc4f25c0 Sofia Papagiannaki
        public2 = self._assert_public_object(cname, oname, odata)
223 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(public, public2)
224 bc4f25c0 Sofia Papagiannaki
225 f53483d8 Sofia Papagiannaki
        # delete object history until now
226 bc4f25c0 Sofia Papagiannaki
        _time.sleep(1)
227 bc4f25c0 Sofia Papagiannaki
        t = datetime.datetime.utcnow()
228 bc4f25c0 Sofia Papagiannaki
        now = int(_time.mktime(t.timetuple()))
229 bc4f25c0 Sofia Papagiannaki
        r = self.delete('%s?intil=%d' % (url, now))
230 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
231 bc4f25c0 Sofia Papagiannaki
        r = self.get(url)
232 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
233 bc4f25c0 Sofia Papagiannaki
        r = self.get(public)
234 bc4f25c0 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
235 f53483d8 Sofia Papagiannaki
236 f53483d8 Sofia Papagiannaki
    def test_public_manifest(self):
237 f53483d8 Sofia Papagiannaki
        cname = self.create_container()[0]
238 f53483d8 Sofia Papagiannaki
        prefix = 'myobject/'
239 f53483d8 Sofia Papagiannaki
        data = ''
240 f53483d8 Sofia Papagiannaki
        for i in range(random.randint(2, 10)):
241 f53483d8 Sofia Papagiannaki
            part = '%s%d' % (prefix, i)
242 f53483d8 Sofia Papagiannaki
            data += self.upload_object(cname, oname=part)[1]
243 f53483d8 Sofia Papagiannaki
244 f53483d8 Sofia Papagiannaki
        manifest = '%s/%s' % (cname, prefix)
245 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
246 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
247 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest,
248 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_PUBLIC='true')
249 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
250 f53483d8 Sofia Papagiannaki
251 f53483d8 Sofia Papagiannaki
        r = self.head(url)
252 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Manifest' in r)
253 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Manifest'], manifest)
254 f53483d8 Sofia Papagiannaki
255 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Public' in r)
256 f53483d8 Sofia Papagiannaki
        public = r['X-Object-Public']
257 f53483d8 Sofia Papagiannaki
258 f53483d8 Sofia Papagiannaki
        r = self.get(public)
259 f53483d8 Sofia Papagiannaki
        self.assertTrue(r.content, data)
260 f53483d8 Sofia Papagiannaki
        #self.assertTrue('X-Object-Manifest' in r)
261 f53483d8 Sofia Papagiannaki
        #self.assertEqual(r['X-Object-Manifest'], manifest)
262 efb1f3d3 Sofia Papagiannaki
263 efb1f3d3 Sofia Papagiannaki
    def test_public_get_partial(self):
264 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
265 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
266 efb1f3d3 Sofia Papagiannaki
267 efb1f3d3 Sofia Papagiannaki
        # set public
268 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
269 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
270 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
271 efb1f3d3 Sofia Papagiannaki
272 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
273 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
274 efb1f3d3 Sofia Papagiannaki
275 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_RANGE='bytes=0-499')
276 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
277 efb1f3d3 Sofia Papagiannaki
        data = r.content
278 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(data, odata[:500])
279 efb1f3d3 Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
280 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
281 efb1f3d3 Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
282 efb1f3d3 Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
283 efb1f3d3 Sofia Papagiannaki
284 efb1f3d3 Sofia Papagiannaki
    def test_public_get_final_500(self):
285 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
286 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
287 efb1f3d3 Sofia Papagiannaki
        size = len(odata)
288 efb1f3d3 Sofia Papagiannaki
289 efb1f3d3 Sofia Papagiannaki
        # set public
290 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
291 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
292 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
293 efb1f3d3 Sofia Papagiannaki
294 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
295 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
296 efb1f3d3 Sofia Papagiannaki
297 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_RANGE='bytes=-500')
298 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
299 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.content, odata[-500:])
300 efb1f3d3 Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
301 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
302 efb1f3d3 Sofia Papagiannaki
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
303 efb1f3d3 Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
304 efb1f3d3 Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
305 efb1f3d3 Sofia Papagiannaki
306 efb1f3d3 Sofia Papagiannaki
    def test_public_get_rest(self):
307 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
308 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
309 efb1f3d3 Sofia Papagiannaki
        size = len(odata)
310 efb1f3d3 Sofia Papagiannaki
        offset = len(odata) - random.randint(1, 512)
311 efb1f3d3 Sofia Papagiannaki
312 efb1f3d3 Sofia Papagiannaki
        # set public
313 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
314 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
315 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
316 efb1f3d3 Sofia Papagiannaki
317 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
318 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
319 efb1f3d3 Sofia Papagiannaki
320 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_RANGE='bytes=%s-' % offset)
321 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
322 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.content, odata[offset:])
323 efb1f3d3 Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
324 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
325 efb1f3d3 Sofia Papagiannaki
                         'bytes %s-%s/%s' % (offset, size - 1, size))
326 efb1f3d3 Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
327 efb1f3d3 Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
328 efb1f3d3 Sofia Papagiannaki
329 efb1f3d3 Sofia Papagiannaki
    def test_public_get_range_not_satisfiable(self):
330 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
331 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
332 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
333 efb1f3d3 Sofia Papagiannaki
334 efb1f3d3 Sofia Papagiannaki
        offset = len(odata) + 1
335 efb1f3d3 Sofia Papagiannaki
336 efb1f3d3 Sofia Papagiannaki
        # set public
337 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
338 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
339 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
340 efb1f3d3 Sofia Papagiannaki
341 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
342 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
343 efb1f3d3 Sofia Papagiannaki
344 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_RANGE='bytes=0-%s' % offset)
345 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
346 efb1f3d3 Sofia Papagiannaki
347 efb1f3d3 Sofia Papagiannaki
    def test_public_multiple_range(self):
348 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
349 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
350 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
351 efb1f3d3 Sofia Papagiannaki
352 efb1f3d3 Sofia Papagiannaki
        # set public
353 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
354 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
355 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
356 efb1f3d3 Sofia Papagiannaki
357 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
358 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
359 efb1f3d3 Sofia Papagiannaki
360 efb1f3d3 Sofia Papagiannaki
        l = ['0-499', '-500', '1000-']
361 efb1f3d3 Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
362 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_RANGE=ranges)
363 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
364 efb1f3d3 Sofia Papagiannaki
        self.assertTrue('content-type' in r)
365 efb1f3d3 Sofia Papagiannaki
        p = re.compile(
366 efb1f3d3 Sofia Papagiannaki
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
367 efb1f3d3 Sofia Papagiannaki
            re.I)
368 efb1f3d3 Sofia Papagiannaki
        m = p.match(r['content-type'])
369 efb1f3d3 Sofia Papagiannaki
        if m is None:
370 efb1f3d3 Sofia Papagiannaki
            self.fail('Invalid multiple range content type')
371 efb1f3d3 Sofia Papagiannaki
        boundary = m.groupdict()['boundary']
372 efb1f3d3 Sofia Papagiannaki
        cparts = r.content.split('--%s' % boundary)[1:-1]
373 efb1f3d3 Sofia Papagiannaki
374 efb1f3d3 Sofia Papagiannaki
        # assert content parts length
375 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(len(cparts), len(l))
376 efb1f3d3 Sofia Papagiannaki
377 efb1f3d3 Sofia Papagiannaki
        # for each content part assert headers
378 efb1f3d3 Sofia Papagiannaki
        i = 0
379 efb1f3d3 Sofia Papagiannaki
        for cpart in cparts:
380 efb1f3d3 Sofia Papagiannaki
            content = cpart.split('\r\n')
381 efb1f3d3 Sofia Papagiannaki
            headers = content[1:3]
382 efb1f3d3 Sofia Papagiannaki
            content_range = headers[0].split(': ')
383 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(content_range[0], 'Content-Range')
384 efb1f3d3 Sofia Papagiannaki
385 efb1f3d3 Sofia Papagiannaki
            r = l[i].split('-')
386 efb1f3d3 Sofia Papagiannaki
            if not r[0] and not r[1]:
387 efb1f3d3 Sofia Papagiannaki
                pass
388 efb1f3d3 Sofia Papagiannaki
            elif not r[0]:
389 efb1f3d3 Sofia Papagiannaki
                start = len(odata) - int(r[1])
390 efb1f3d3 Sofia Papagiannaki
                end = len(odata)
391 efb1f3d3 Sofia Papagiannaki
            elif not r[1]:
392 efb1f3d3 Sofia Papagiannaki
                start = int(r[0])
393 efb1f3d3 Sofia Papagiannaki
                end = len(odata)
394 efb1f3d3 Sofia Papagiannaki
            else:
395 efb1f3d3 Sofia Papagiannaki
                start = int(r[0])
396 efb1f3d3 Sofia Papagiannaki
                end = int(r[1]) + 1
397 efb1f3d3 Sofia Papagiannaki
            fdata = odata[start:end]
398 efb1f3d3 Sofia Papagiannaki
            sdata = '\r\n'.join(content[4:-1])
399 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(len(fdata), len(sdata))
400 efb1f3d3 Sofia Papagiannaki
            self.assertEquals(fdata, sdata)
401 efb1f3d3 Sofia Papagiannaki
            i += 1
402 efb1f3d3 Sofia Papagiannaki
403 efb1f3d3 Sofia Papagiannaki
    def test_public_multiple_range_not_satisfiable(self):
404 efb1f3d3 Sofia Papagiannaki
        # perform get with multiple range
405 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
406 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
407 efb1f3d3 Sofia Papagiannaki
408 efb1f3d3 Sofia Papagiannaki
        # set public
409 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
410 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
411 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
412 efb1f3d3 Sofia Papagiannaki
413 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
414 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
415 efb1f3d3 Sofia Papagiannaki
416 efb1f3d3 Sofia Papagiannaki
        out_of_range = len(odata) + 1
417 efb1f3d3 Sofia Papagiannaki
        l = ['0-499', '-500', '%d-' % out_of_range]
418 efb1f3d3 Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
419 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_RANGE=ranges)
420 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
421 efb1f3d3 Sofia Papagiannaki
422 efb1f3d3 Sofia Papagiannaki
    def test_public_get_if_match(self):
423 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
424 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
425 efb1f3d3 Sofia Papagiannaki
426 efb1f3d3 Sofia Papagiannaki
        # set public
427 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
428 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
429 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
430 efb1f3d3 Sofia Papagiannaki
431 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
432 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
433 efb1f3d3 Sofia Papagiannaki
434 efb1f3d3 Sofia Papagiannaki
        def assert_matches(etag):
435 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_MATCH=etag)
436 efb1f3d3 Sofia Papagiannaki
437 efb1f3d3 Sofia Papagiannaki
            # assert get success
438 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
439 efb1f3d3 Sofia Papagiannaki
440 efb1f3d3 Sofia Papagiannaki
            # assert response content
441 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.content, odata)
442 efb1f3d3 Sofia Papagiannaki
443 efb1f3d3 Sofia Papagiannaki
        # perform get with If-Match
444 efb1f3d3 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
445 efb1f3d3 Sofia Papagiannaki
            assert_matches(md5_hash(odata))
446 efb1f3d3 Sofia Papagiannaki
        else:
447 efb1f3d3 Sofia Papagiannaki
            assert_matches(merkle(odata))
448 efb1f3d3 Sofia Papagiannaki
449 efb1f3d3 Sofia Papagiannaki
    def test_public_get_if_match_star(self):
450 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
451 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
452 efb1f3d3 Sofia Papagiannaki
453 efb1f3d3 Sofia Papagiannaki
        # set public
454 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
455 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
456 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
457 efb1f3d3 Sofia Papagiannaki
458 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
459 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
460 efb1f3d3 Sofia Papagiannaki
461 efb1f3d3 Sofia Papagiannaki
        # perform get with If-Match *
462 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_IF_MATCH='*')
463 efb1f3d3 Sofia Papagiannaki
464 efb1f3d3 Sofia Papagiannaki
        # assert get success
465 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
466 efb1f3d3 Sofia Papagiannaki
467 efb1f3d3 Sofia Papagiannaki
        # assert response content
468 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.content, odata)
469 efb1f3d3 Sofia Papagiannaki
470 efb1f3d3 Sofia Papagiannaki
    def test_public_get_multiple_if_match(self):
471 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
472 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
473 efb1f3d3 Sofia Papagiannaki
474 efb1f3d3 Sofia Papagiannaki
        # set public
475 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
476 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
477 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
478 efb1f3d3 Sofia Papagiannaki
479 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
480 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
481 efb1f3d3 Sofia Papagiannaki
482 efb1f3d3 Sofia Papagiannaki
        def assert_multiple_match(etag):
483 efb1f3d3 Sofia Papagiannaki
            quoted = lambda s: '"%s"' % s
484 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_MATCH=','.join(
485 efb1f3d3 Sofia Papagiannaki
                [quoted(etag), quoted(get_random_data(64))]))
486 efb1f3d3 Sofia Papagiannaki
487 efb1f3d3 Sofia Papagiannaki
            # assert get success
488 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
489 efb1f3d3 Sofia Papagiannaki
490 efb1f3d3 Sofia Papagiannaki
            # assert response content
491 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.content, odata)
492 efb1f3d3 Sofia Papagiannaki
493 efb1f3d3 Sofia Papagiannaki
        # perform get with If-Match
494 efb1f3d3 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
495 efb1f3d3 Sofia Papagiannaki
            assert_multiple_match(md5_hash(odata))
496 efb1f3d3 Sofia Papagiannaki
        else:
497 efb1f3d3 Sofia Papagiannaki
            assert_multiple_match(merkle(odata))
498 efb1f3d3 Sofia Papagiannaki
499 efb1f3d3 Sofia Papagiannaki
    def test_public_if_match_precondition_failed(self):
500 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
501 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
502 efb1f3d3 Sofia Papagiannaki
503 efb1f3d3 Sofia Papagiannaki
        # set public
504 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
505 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
506 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
507 efb1f3d3 Sofia Papagiannaki
508 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
509 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
510 efb1f3d3 Sofia Papagiannaki
511 efb1f3d3 Sofia Papagiannaki
        # perform get with If-Match
512 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_IF_MATCH=get_random_name())
513 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 412)
514 efb1f3d3 Sofia Papagiannaki
515 efb1f3d3 Sofia Papagiannaki
    def test_public_if_none_match(self):
516 efb1f3d3 Sofia Papagiannaki
        # upload object
517 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
518 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
519 efb1f3d3 Sofia Papagiannaki
520 efb1f3d3 Sofia Papagiannaki
        # set public
521 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
522 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
523 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
524 efb1f3d3 Sofia Papagiannaki
525 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
526 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
527 efb1f3d3 Sofia Papagiannaki
528 efb1f3d3 Sofia Papagiannaki
        def assert_non_match(etag):
529 efb1f3d3 Sofia Papagiannaki
            # perform get with If-None-Match
530 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_NONE_MATCH=etag)
531 efb1f3d3 Sofia Papagiannaki
532 efb1f3d3 Sofia Papagiannaki
            # assert precondition_failed
533 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
534 efb1f3d3 Sofia Papagiannaki
535 efb1f3d3 Sofia Papagiannaki
            # update object data
536 efb1f3d3 Sofia Papagiannaki
            r = self.append_object_data(cname, oname)[-1]
537 efb1f3d3 Sofia Papagiannaki
            self.assertTrue(etag != r['ETag'])
538 efb1f3d3 Sofia Papagiannaki
539 efb1f3d3 Sofia Papagiannaki
            # perform get with If-None-Match
540 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_NONE_MATCH=etag)
541 efb1f3d3 Sofia Papagiannaki
542 efb1f3d3 Sofia Papagiannaki
            # assert get success
543 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
544 efb1f3d3 Sofia Papagiannaki
545 efb1f3d3 Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
546 efb1f3d3 Sofia Papagiannaki
            assert_non_match(md5_hash(odata))
547 efb1f3d3 Sofia Papagiannaki
        else:
548 efb1f3d3 Sofia Papagiannaki
            assert_non_match(merkle(odata))
549 efb1f3d3 Sofia Papagiannaki
550 efb1f3d3 Sofia Papagiannaki
    def test_public_if_none_match_star(self):
551 efb1f3d3 Sofia Papagiannaki
        # upload object
552 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
553 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
554 efb1f3d3 Sofia Papagiannaki
555 efb1f3d3 Sofia Papagiannaki
        # set public
556 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
557 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
558 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
559 efb1f3d3 Sofia Papagiannaki
560 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
561 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
562 efb1f3d3 Sofia Papagiannaki
563 efb1f3d3 Sofia Papagiannaki
        # perform get with If-None-Match with star
564 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_IF_NONE_MATCH='*')
565 efb1f3d3 Sofia Papagiannaki
566 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
567 efb1f3d3 Sofia Papagiannaki
568 efb1f3d3 Sofia Papagiannaki
    def test_public_if_modified_sinse(self):
569 efb1f3d3 Sofia Papagiannaki
        cname = get_random_name()
570 efb1f3d3 Sofia Papagiannaki
        self.create_container(cname)
571 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
572 efb1f3d3 Sofia Papagiannaki
        self._assert_not_public_object(cname, oname)
573 efb1f3d3 Sofia Papagiannaki
574 efb1f3d3 Sofia Papagiannaki
        # set public
575 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
576 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
577 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
578 efb1f3d3 Sofia Papagiannaki
579 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
580 efb1f3d3 Sofia Papagiannaki
        public = info['X-Object-Public']
581 efb1f3d3 Sofia Papagiannaki
582 efb1f3d3 Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
583 efb1f3d3 Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
584 efb1f3d3 Sofia Papagiannaki
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
585 efb1f3d3 Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
586 efb1f3d3 Sofia Papagiannaki
587 efb1f3d3 Sofia Papagiannaki
        for t in t1_formats:
588 efb1f3d3 Sofia Papagiannaki
            r = self.get(public, user='user2', HTTP_IF_MODIFIED_SINCE=t,
589 efb1f3d3 Sofia Papagiannaki
                         token=None)
590 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
591 efb1f3d3 Sofia Papagiannaki
592 efb1f3d3 Sofia Papagiannaki
        _time.sleep(1)
593 efb1f3d3 Sofia Papagiannaki
594 efb1f3d3 Sofia Papagiannaki
        # update object data
595 efb1f3d3 Sofia Papagiannaki
        appended_data = self.append_object_data(cname, oname)[1]
596 efb1f3d3 Sofia Papagiannaki
597 efb1f3d3 Sofia Papagiannaki
        # Check modified since
598 efb1f3d3 Sofia Papagiannaki
        for t in t1_formats:
599 efb1f3d3 Sofia Papagiannaki
            r = self.get(public, user='user2', HTTP_IF_MODIFIED_SINCE=t,
600 efb1f3d3 Sofia Papagiannaki
                         token=None)
601 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
602 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.content, odata + appended_data)
603 efb1f3d3 Sofia Papagiannaki
604 efb1f3d3 Sofia Papagiannaki
    def test_public_if_modified_since_invalid_date(self):
605 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
606 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
607 efb1f3d3 Sofia Papagiannaki
608 efb1f3d3 Sofia Papagiannaki
        # set public
609 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
610 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
611 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
612 efb1f3d3 Sofia Papagiannaki
613 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
614 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
615 efb1f3d3 Sofia Papagiannaki
616 efb1f3d3 Sofia Papagiannaki
        r = self.get(public_url, HTTP_IF_MODIFIED_SINCE='Monday')
617 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
618 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.content, odata)
619 efb1f3d3 Sofia Papagiannaki
620 efb1f3d3 Sofia Papagiannaki
    def test_public_if_public_not_modified_since(self):
621 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
622 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
623 efb1f3d3 Sofia Papagiannaki
624 efb1f3d3 Sofia Papagiannaki
        # set public
625 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
626 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
627 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
628 efb1f3d3 Sofia Papagiannaki
629 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
630 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
631 efb1f3d3 Sofia Papagiannaki
        last_modified = info['Last-Modified']
632 efb1f3d3 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
633 efb1f3d3 Sofia Papagiannaki
634 efb1f3d3 Sofia Papagiannaki
        # Check unmodified
635 efb1f3d3 Sofia Papagiannaki
        t1 = t + datetime.timedelta(seconds=1)
636 efb1f3d3 Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
637 efb1f3d3 Sofia Papagiannaki
        for t in t1_formats:
638 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
639 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
640 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.content, odata)
641 efb1f3d3 Sofia Papagiannaki
642 efb1f3d3 Sofia Papagiannaki
        # modify object
643 efb1f3d3 Sofia Papagiannaki
        _time.sleep(2)
644 efb1f3d3 Sofia Papagiannaki
        self.append_object_data(cname, oname)
645 efb1f3d3 Sofia Papagiannaki
646 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
647 efb1f3d3 Sofia Papagiannaki
        last_modified = info['Last-Modified']
648 efb1f3d3 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
649 efb1f3d3 Sofia Papagiannaki
        t2 = t - datetime.timedelta(seconds=1)
650 efb1f3d3 Sofia Papagiannaki
        t2_formats = map(t2.strftime, DATE_FORMATS)
651 efb1f3d3 Sofia Papagiannaki
652 efb1f3d3 Sofia Papagiannaki
        # check modified
653 efb1f3d3 Sofia Papagiannaki
        for t in t2_formats:
654 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
655 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
656 efb1f3d3 Sofia Papagiannaki
657 efb1f3d3 Sofia Papagiannaki
        # modify account: update object meta
658 efb1f3d3 Sofia Papagiannaki
        _time.sleep(1)
659 efb1f3d3 Sofia Papagiannaki
        self.update_object_meta(cname, oname, {'foo': 'bar'})
660 efb1f3d3 Sofia Papagiannaki
661 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
662 efb1f3d3 Sofia Papagiannaki
        last_modified = info['Last-Modified']
663 efb1f3d3 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
664 efb1f3d3 Sofia Papagiannaki
        t3 = t - datetime.timedelta(seconds=1)
665 efb1f3d3 Sofia Papagiannaki
        t3_formats = map(t3.strftime, DATE_FORMATS)
666 efb1f3d3 Sofia Papagiannaki
667 efb1f3d3 Sofia Papagiannaki
        # check modified
668 efb1f3d3 Sofia Papagiannaki
        for t in t3_formats:
669 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
670 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
671 efb1f3d3 Sofia Papagiannaki
672 efb1f3d3 Sofia Papagiannaki
    def test_public_if_unmodified_since(self):
673 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
674 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
675 efb1f3d3 Sofia Papagiannaki
676 efb1f3d3 Sofia Papagiannaki
        # set public
677 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
678 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
679 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
680 efb1f3d3 Sofia Papagiannaki
681 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
682 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
683 efb1f3d3 Sofia Papagiannaki
        last_modified = info['Last-Modified']
684 efb1f3d3 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
685 efb1f3d3 Sofia Papagiannaki
        t = t + datetime.timedelta(seconds=1)
686 efb1f3d3 Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
687 efb1f3d3 Sofia Papagiannaki
688 efb1f3d3 Sofia Papagiannaki
        for tf in t_formats:
689 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=tf)
690 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
691 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.content, odata)
692 efb1f3d3 Sofia Papagiannaki
693 efb1f3d3 Sofia Papagiannaki
    def test_public_if_unmodified_since_precondition_failed(self):
694 efb1f3d3 Sofia Papagiannaki
        cname = self.create_container()[0]
695 efb1f3d3 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
696 efb1f3d3 Sofia Papagiannaki
697 efb1f3d3 Sofia Papagiannaki
        # set public
698 efb1f3d3 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
699 efb1f3d3 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
700 efb1f3d3 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
701 efb1f3d3 Sofia Papagiannaki
702 efb1f3d3 Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
703 efb1f3d3 Sofia Papagiannaki
        public_url = info['X-Object-Public']
704 efb1f3d3 Sofia Papagiannaki
        last_modified = info['Last-Modified']
705 efb1f3d3 Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
706 efb1f3d3 Sofia Papagiannaki
        t = t - datetime.timedelta(seconds=1)
707 efb1f3d3 Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
708 efb1f3d3 Sofia Papagiannaki
709 efb1f3d3 Sofia Papagiannaki
        for tf in t_formats:
710 efb1f3d3 Sofia Papagiannaki
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=tf)
711 efb1f3d3 Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)