root / snf-pithos-app / pithos / api / test / public.py @ efb1f3d3
History | View | Annotate | Download (26 kB)
1 | bc4f25c0 | Sofia Papagiannaki | # Copyright 2011 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | bc4f25c0 | Sofia Papagiannaki | #
|
3 | bc4f25c0 | Sofia Papagiannaki | # Redistribution and use in source and binary forms, with or
|
4 | bc4f25c0 | Sofia Papagiannaki | # without modification, are permitted provided that the following
|
5 | bc4f25c0 | Sofia Papagiannaki | # conditions are met:
|
6 | bc4f25c0 | Sofia Papagiannaki | #
|
7 | bc4f25c0 | Sofia Papagiannaki | # 1. Redistributions of source code must retain the above
|
8 | bc4f25c0 | Sofia Papagiannaki | # copyright notice, this list of conditions and the following
|
9 | bc4f25c0 | Sofia Papagiannaki | # disclaimer.
|
10 | bc4f25c0 | Sofia Papagiannaki | #
|
11 | bc4f25c0 | Sofia Papagiannaki | # 2. Redistributions in binary form must reproduce the above
|
12 | bc4f25c0 | Sofia Papagiannaki | # copyright notice, this list of conditions and the following
|
13 | bc4f25c0 | Sofia Papagiannaki | # disclaimer in the documentation and/or other materials
|
14 | bc4f25c0 | Sofia Papagiannaki | # provided with the distribution.
|
15 | bc4f25c0 | Sofia Papagiannaki | #
|
16 | bc4f25c0 | Sofia Papagiannaki | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | bc4f25c0 | Sofia Papagiannaki | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | bc4f25c0 | Sofia Papagiannaki | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | bc4f25c0 | Sofia Papagiannaki | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | bc4f25c0 | Sofia Papagiannaki | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | bc4f25c0 | Sofia Papagiannaki | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | bc4f25c0 | Sofia Papagiannaki | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | bc4f25c0 | Sofia Papagiannaki | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | bc4f25c0 | Sofia Papagiannaki | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | bc4f25c0 | Sofia Papagiannaki | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | bc4f25c0 | Sofia Papagiannaki | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | bc4f25c0 | Sofia Papagiannaki | # POSSIBILITY OF SUCH DAMAGE.
|
28 | bc4f25c0 | Sofia Papagiannaki | #
|
29 | bc4f25c0 | Sofia Papagiannaki | # The views and conclusions contained in the software and
|
30 | bc4f25c0 | Sofia Papagiannaki | # documentation are those of the authors and should not be
|
31 | bc4f25c0 | Sofia Papagiannaki | # interpreted as representing official policies, either expressed
|
32 | bc4f25c0 | Sofia Papagiannaki | # or implied, of GRNET S.A.
|
33 | bc4f25c0 | Sofia Papagiannaki | |
34 | bc4f25c0 | Sofia Papagiannaki | import random |
35 | bc4f25c0 | Sofia Papagiannaki | import datetime |
36 | bc4f25c0 | Sofia Papagiannaki | import time as _time |
37 | efb1f3d3 | Sofia Papagiannaki | import re |
38 | efb1f3d3 | Sofia Papagiannaki | |
39 | efb1f3d3 | Sofia Papagiannaki | from functools import partial |
40 | bc4f25c0 | Sofia Papagiannaki | |
41 | bc4f25c0 | Sofia Papagiannaki | from synnefo.lib import join_urls |
42 | bc4f25c0 | Sofia Papagiannaki | |
43 | bc4f25c0 | Sofia Papagiannaki | import django.utils.simplejson as json |
44 | bc4f25c0 | Sofia Papagiannaki | |
45 | efb1f3d3 | Sofia Papagiannaki | from pithos.api.test import (PithosAPITest, DATE_FORMATS, TEST_BLOCK_SIZE, |
46 | efb1f3d3 | Sofia Papagiannaki | TEST_HASH_ALGORITHM) |
47 | efb1f3d3 | Sofia Papagiannaki | |
48 | efb1f3d3 | Sofia Papagiannaki | from pithos.api.test.util import (get_random_name, get_random_data, md5_hash, |
49 | efb1f3d3 | Sofia Papagiannaki | merkle) |
50 | bc4f25c0 | Sofia Papagiannaki | from pithos.api import settings as pithos_settings |
51 | bc4f25c0 | Sofia Papagiannaki | |
52 | efb1f3d3 | Sofia Papagiannaki | merkle = partial(merkle, |
53 | efb1f3d3 | Sofia Papagiannaki | blocksize=TEST_BLOCK_SIZE, |
54 | efb1f3d3 | Sofia Papagiannaki | blockhash=TEST_HASH_ALGORITHM) |
55 | efb1f3d3 | Sofia Papagiannaki | |
56 | bc4f25c0 | Sofia Papagiannaki | |
57 | bc4f25c0 | Sofia Papagiannaki | class TestPublic(PithosAPITest): |
58 | bc4f25c0 | Sofia Papagiannaki | def _assert_not_public_object(self, cname, oname): |
59 | bc4f25c0 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
60 | bc4f25c0 | Sofia Papagiannaki | self.assertTrue('X-Object-Public' not in info) |
61 | bc4f25c0 | Sofia Papagiannaki | |
62 | bc4f25c0 | Sofia Papagiannaki | def _assert_public_object(self, cname, oname, odata): |
63 | bc4f25c0 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
64 | bc4f25c0 | Sofia Papagiannaki | self.assertTrue('X-Object-Public' in info) |
65 | bc4f25c0 | Sofia Papagiannaki | public = info['X-Object-Public']
|
66 | bc4f25c0 | Sofia Papagiannaki | |
67 | bc4f25c0 | Sofia Papagiannaki | self.assertTrue(len(public) >= pithos_settings.PUBLIC_URL_SECURITY) |
68 | bc4f25c0 | Sofia Papagiannaki | (self.assertTrue(l in pithos_settings.PUBLIC_URL_ALPHABET) for |
69 | bc4f25c0 | Sofia Papagiannaki | l in public)
|
70 | bc4f25c0 | Sofia Papagiannaki | |
71 | f53483d8 | Sofia Papagiannaki | r = self.get(public, user='user2', token=None) |
72 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
73 | bc4f25c0 | Sofia Papagiannaki | self.assertTrue('X-Object-Public' not in r) |
74 | bc4f25c0 | Sofia Papagiannaki | |
75 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.content, odata)
|
76 | bc4f25c0 | Sofia Papagiannaki | |
77 | bc4f25c0 | Sofia Papagiannaki | # assert other users cannot access the object using the priavate path
|
78 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
79 | bc4f25c0 | Sofia Papagiannaki | r = self.head(url, user='user2') |
80 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 403) |
81 | bc4f25c0 | Sofia Papagiannaki | |
82 | bc4f25c0 | Sofia Papagiannaki | r = self.get(url, user='user2') |
83 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 403) |
84 | bc4f25c0 | Sofia Papagiannaki | |
85 | bc4f25c0 | Sofia Papagiannaki | return public
|
86 | bc4f25c0 | Sofia Papagiannaki | |
87 | bc4f25c0 | Sofia Papagiannaki | def test_set_object_public(self): |
88 | bc4f25c0 | Sofia Papagiannaki | cname = get_random_name() |
89 | bc4f25c0 | Sofia Papagiannaki | self.create_container(cname)
|
90 | bc4f25c0 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
91 | bc4f25c0 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
92 | bc4f25c0 | Sofia Papagiannaki | |
93 | bc4f25c0 | Sofia Papagiannaki | # set public
|
94 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
95 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
96 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
97 | bc4f25c0 | Sofia Papagiannaki | |
98 | bc4f25c0 | Sofia Papagiannaki | self._assert_public_object(cname, oname, odata)
|
99 | bc4f25c0 | Sofia Papagiannaki | |
100 | bc4f25c0 | Sofia Papagiannaki | def test_set_twice(self): |
101 | bc4f25c0 | Sofia Papagiannaki | cname = get_random_name() |
102 | bc4f25c0 | Sofia Papagiannaki | self.create_container(cname)
|
103 | bc4f25c0 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
104 | bc4f25c0 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
105 | bc4f25c0 | Sofia Papagiannaki | |
106 | bc4f25c0 | Sofia Papagiannaki | # set public
|
107 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
108 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
109 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
110 | bc4f25c0 | Sofia Papagiannaki | |
111 | bc4f25c0 | Sofia Papagiannaki | public = self._assert_public_object(cname, oname, odata)
|
112 | bc4f25c0 | Sofia Papagiannaki | |
113 | bc4f25c0 | Sofia Papagiannaki | # set public
|
114 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
115 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
116 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
117 | bc4f25c0 | Sofia Papagiannaki | |
118 | bc4f25c0 | Sofia Papagiannaki | public2 = self._assert_public_object(cname, oname, odata)
|
119 | bc4f25c0 | Sofia Papagiannaki | |
120 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(public, public2)
|
121 | bc4f25c0 | Sofia Papagiannaki | |
122 | bc4f25c0 | Sofia Papagiannaki | def test_set_unset_set(self): |
123 | bc4f25c0 | Sofia Papagiannaki | cname = get_random_name() |
124 | bc4f25c0 | Sofia Papagiannaki | self.create_container(cname)
|
125 | bc4f25c0 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
126 | bc4f25c0 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
127 | bc4f25c0 | Sofia Papagiannaki | |
128 | bc4f25c0 | Sofia Papagiannaki | # set public
|
129 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
130 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
131 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
132 | bc4f25c0 | Sofia Papagiannaki | |
133 | bc4f25c0 | Sofia Papagiannaki | public = self._assert_public_object(cname, oname, odata)
|
134 | bc4f25c0 | Sofia Papagiannaki | |
135 | bc4f25c0 | Sofia Papagiannaki | # unset public
|
136 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
137 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false') |
138 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
139 | bc4f25c0 | Sofia Papagiannaki | |
140 | bc4f25c0 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
141 | bc4f25c0 | Sofia Papagiannaki | |
142 | bc4f25c0 | Sofia Papagiannaki | # set public
|
143 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
144 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
145 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
146 | bc4f25c0 | Sofia Papagiannaki | |
147 | bc4f25c0 | Sofia Papagiannaki | public2 = self._assert_public_object(cname, oname, odata)
|
148 | bc4f25c0 | Sofia Papagiannaki | |
149 | bc4f25c0 | Sofia Papagiannaki | self.assertTrue(public != public2)
|
150 | bc4f25c0 | Sofia Papagiannaki | |
151 | bc4f25c0 | Sofia Papagiannaki | # unset public
|
152 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
153 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false') |
154 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
155 | bc4f25c0 | Sofia Papagiannaki | |
156 | bc4f25c0 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
157 | bc4f25c0 | Sofia Papagiannaki | |
158 | bc4f25c0 | Sofia Papagiannaki | def test_update_public_object(self): |
159 | bc4f25c0 | Sofia Papagiannaki | cname = get_random_name() |
160 | bc4f25c0 | Sofia Papagiannaki | self.create_container(cname)
|
161 | bc4f25c0 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
162 | bc4f25c0 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
163 | bc4f25c0 | Sofia Papagiannaki | |
164 | bc4f25c0 | Sofia Papagiannaki | # set public
|
165 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
166 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
167 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
168 | bc4f25c0 | Sofia Papagiannaki | |
169 | bc4f25c0 | Sofia Papagiannaki | public = self._assert_public_object(cname, oname, odata)
|
170 | bc4f25c0 | Sofia Papagiannaki | |
171 | bc4f25c0 | Sofia Papagiannaki | odata2 = self.append_object_data(cname, oname)[1] |
172 | bc4f25c0 | Sofia Papagiannaki | |
173 | bc4f25c0 | Sofia Papagiannaki | public2 = self._assert_public_object(cname, oname, odata + odata2)
|
174 | bc4f25c0 | Sofia Papagiannaki | |
175 | bc4f25c0 | Sofia Papagiannaki | self.assertTrue(public == public2)
|
176 | bc4f25c0 | Sofia Papagiannaki | |
177 | bc4f25c0 | Sofia Papagiannaki | def test_delete_public_object(self): |
178 | bc4f25c0 | Sofia Papagiannaki | cname = get_random_name() |
179 | bc4f25c0 | Sofia Papagiannaki | self.create_container(cname)
|
180 | bc4f25c0 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
181 | bc4f25c0 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
182 | bc4f25c0 | Sofia Papagiannaki | |
183 | bc4f25c0 | Sofia Papagiannaki | # set public
|
184 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
185 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
186 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
187 | bc4f25c0 | Sofia Papagiannaki | public = self._assert_public_object(cname, oname, odata)
|
188 | bc4f25c0 | Sofia Papagiannaki | |
189 | bc4f25c0 | Sofia Papagiannaki | # delete object
|
190 | bc4f25c0 | Sofia Papagiannaki | r = self.delete(url)
|
191 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 204) |
192 | bc4f25c0 | Sofia Papagiannaki | r = self.get(url)
|
193 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 404) |
194 | bc4f25c0 | Sofia Papagiannaki | r = self.get(public)
|
195 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 404) |
196 | bc4f25c0 | Sofia Papagiannaki | |
197 | bc4f25c0 | Sofia Papagiannaki | def test_delete_public_object_history(self): |
198 | bc4f25c0 | Sofia Papagiannaki | cname = get_random_name() |
199 | bc4f25c0 | Sofia Papagiannaki | self.create_container(cname)
|
200 | bc4f25c0 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
201 | bc4f25c0 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
202 | bc4f25c0 | Sofia Papagiannaki | |
203 | bc4f25c0 | Sofia Papagiannaki | # set public
|
204 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
205 | bc4f25c0 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
206 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
207 | bc4f25c0 | Sofia Papagiannaki | public = self._assert_public_object(cname, oname, odata)
|
208 | bc4f25c0 | Sofia Papagiannaki | |
209 | bc4f25c0 | Sofia Papagiannaki | for _ in range(random.randint(1, 10)): |
210 | bc4f25c0 | Sofia Papagiannaki | odata += self.append_object_data(cname, oname)[1] |
211 | bc4f25c0 | Sofia Papagiannaki | _time.sleep(1)
|
212 | bc4f25c0 | Sofia Papagiannaki | |
213 | bc4f25c0 | Sofia Papagiannaki | # get object versions
|
214 | bc4f25c0 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
215 | bc4f25c0 | Sofia Papagiannaki | r = self.get('%s?version=list&format=json' % url) |
216 | bc4f25c0 | Sofia Papagiannaki | version_list = json.loads(r.content)['versions']
|
217 | f5bbcb5d | Sofia Papagiannaki | mtime = [int(float(t[1])) for t in version_list] |
218 | bc4f25c0 | Sofia Papagiannaki | |
219 | bc4f25c0 | Sofia Papagiannaki | # delete object history
|
220 | bc4f25c0 | Sofia Papagiannaki | i = random.randrange(len(mtime))
|
221 | bc4f25c0 | Sofia Papagiannaki | self.delete('%s?until=%d' % (url, mtime[i])) |
222 | bc4f25c0 | Sofia Papagiannaki | public2 = self._assert_public_object(cname, oname, odata)
|
223 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(public, public2)
|
224 | bc4f25c0 | Sofia Papagiannaki | |
225 | f53483d8 | Sofia Papagiannaki | # delete object history until now
|
226 | bc4f25c0 | Sofia Papagiannaki | _time.sleep(1)
|
227 | bc4f25c0 | Sofia Papagiannaki | t = datetime.datetime.utcnow() |
228 | bc4f25c0 | Sofia Papagiannaki | now = int(_time.mktime(t.timetuple()))
|
229 | bc4f25c0 | Sofia Papagiannaki | r = self.delete('%s?intil=%d' % (url, now)) |
230 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 204) |
231 | bc4f25c0 | Sofia Papagiannaki | r = self.get(url)
|
232 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 404) |
233 | bc4f25c0 | Sofia Papagiannaki | r = self.get(public)
|
234 | bc4f25c0 | Sofia Papagiannaki | self.assertEqual(r.status_code, 404) |
235 | f53483d8 | Sofia Papagiannaki | |
236 | f53483d8 | Sofia Papagiannaki | def test_public_manifest(self): |
237 | f53483d8 | Sofia Papagiannaki | cname = self.create_container()[0] |
238 | f53483d8 | Sofia Papagiannaki | prefix = 'myobject/'
|
239 | f53483d8 | Sofia Papagiannaki | data = ''
|
240 | f53483d8 | Sofia Papagiannaki | for i in range(random.randint(2, 10)): |
241 | f53483d8 | Sofia Papagiannaki | part = '%s%d' % (prefix, i)
|
242 | f53483d8 | Sofia Papagiannaki | data += self.upload_object(cname, oname=part)[1] |
243 | f53483d8 | Sofia Papagiannaki | |
244 | f53483d8 | Sofia Papagiannaki | manifest = '%s/%s' % (cname, prefix)
|
245 | f53483d8 | Sofia Papagiannaki | oname = get_random_name() |
246 | f53483d8 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
247 | f53483d8 | Sofia Papagiannaki | r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest, |
248 | f53483d8 | Sofia Papagiannaki | HTTP_X_OBJECT_PUBLIC='true')
|
249 | f53483d8 | Sofia Papagiannaki | self.assertEqual(r.status_code, 201) |
250 | f53483d8 | Sofia Papagiannaki | |
251 | f53483d8 | Sofia Papagiannaki | r = self.head(url)
|
252 | f53483d8 | Sofia Papagiannaki | self.assertTrue('X-Object-Manifest' in r) |
253 | f53483d8 | Sofia Papagiannaki | self.assertEqual(r['X-Object-Manifest'], manifest) |
254 | f53483d8 | Sofia Papagiannaki | |
255 | f53483d8 | Sofia Papagiannaki | self.assertTrue('X-Object-Public' in r) |
256 | f53483d8 | Sofia Papagiannaki | public = r['X-Object-Public']
|
257 | f53483d8 | Sofia Papagiannaki | |
258 | f53483d8 | Sofia Papagiannaki | r = self.get(public)
|
259 | f53483d8 | Sofia Papagiannaki | self.assertTrue(r.content, data)
|
260 | f53483d8 | Sofia Papagiannaki | #self.assertTrue('X-Object-Manifest' in r)
|
261 | f53483d8 | Sofia Papagiannaki | #self.assertEqual(r['X-Object-Manifest'], manifest)
|
262 | efb1f3d3 | Sofia Papagiannaki | |
263 | efb1f3d3 | Sofia Papagiannaki | def test_public_get_partial(self): |
264 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
265 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname, length=512)[:-1] |
266 | efb1f3d3 | Sofia Papagiannaki | |
267 | efb1f3d3 | Sofia Papagiannaki | # set public
|
268 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
269 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
270 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
271 | efb1f3d3 | Sofia Papagiannaki | |
272 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
273 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
274 | efb1f3d3 | Sofia Papagiannaki | |
275 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_RANGE='bytes=0-499') |
276 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 206) |
277 | efb1f3d3 | Sofia Papagiannaki | data = r.content |
278 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(data, odata[:500]) |
279 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue('Content-Range' in r) |
280 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata)) |
281 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue('Content-Type' in r) |
282 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue(r['Content-Type'], 'application/octet-stream') |
283 | efb1f3d3 | Sofia Papagiannaki | |
284 | efb1f3d3 | Sofia Papagiannaki | def test_public_get_final_500(self): |
285 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
286 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname, length=512)[:-1] |
287 | efb1f3d3 | Sofia Papagiannaki | size = len(odata)
|
288 | efb1f3d3 | Sofia Papagiannaki | |
289 | efb1f3d3 | Sofia Papagiannaki | # set public
|
290 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
291 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
292 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
293 | efb1f3d3 | Sofia Papagiannaki | |
294 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
295 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
296 | efb1f3d3 | Sofia Papagiannaki | |
297 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_RANGE='bytes=-500') |
298 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 206) |
299 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata[-500:]) |
300 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue('Content-Range' in r) |
301 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r['Content-Range'], |
302 | efb1f3d3 | Sofia Papagiannaki | 'bytes %s-%s/%s' % (size - 500, size - 1, size)) |
303 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue('Content-Type' in r) |
304 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue(r['Content-Type'], 'application/octet-stream') |
305 | efb1f3d3 | Sofia Papagiannaki | |
306 | efb1f3d3 | Sofia Papagiannaki | def test_public_get_rest(self): |
307 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
308 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname, length=512)[:-1] |
309 | efb1f3d3 | Sofia Papagiannaki | size = len(odata)
|
310 | efb1f3d3 | Sofia Papagiannaki | offset = len(odata) - random.randint(1, 512) |
311 | efb1f3d3 | Sofia Papagiannaki | |
312 | efb1f3d3 | Sofia Papagiannaki | # set public
|
313 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
314 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
315 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
316 | efb1f3d3 | Sofia Papagiannaki | |
317 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
318 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
319 | efb1f3d3 | Sofia Papagiannaki | |
320 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_RANGE='bytes=%s-' % offset) |
321 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 206) |
322 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata[offset:])
|
323 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue('Content-Range' in r) |
324 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r['Content-Range'], |
325 | efb1f3d3 | Sofia Papagiannaki | 'bytes %s-%s/%s' % (offset, size - 1, size)) |
326 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue('Content-Type' in r) |
327 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue(r['Content-Type'], 'application/octet-stream') |
328 | efb1f3d3 | Sofia Papagiannaki | |
329 | efb1f3d3 | Sofia Papagiannaki | def test_public_get_range_not_satisfiable(self): |
330 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
331 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname, length=512)[:-1] |
332 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
333 | efb1f3d3 | Sofia Papagiannaki | |
334 | efb1f3d3 | Sofia Papagiannaki | offset = len(odata) + 1 |
335 | efb1f3d3 | Sofia Papagiannaki | |
336 | efb1f3d3 | Sofia Papagiannaki | # set public
|
337 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
338 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
339 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
340 | efb1f3d3 | Sofia Papagiannaki | |
341 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
342 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
343 | efb1f3d3 | Sofia Papagiannaki | |
344 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_RANGE='bytes=0-%s' % offset) |
345 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 416) |
346 | efb1f3d3 | Sofia Papagiannaki | |
347 | efb1f3d3 | Sofia Papagiannaki | def test_public_multiple_range(self): |
348 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
349 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
350 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
351 | efb1f3d3 | Sofia Papagiannaki | |
352 | efb1f3d3 | Sofia Papagiannaki | # set public
|
353 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
354 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
355 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
356 | efb1f3d3 | Sofia Papagiannaki | |
357 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
358 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
359 | efb1f3d3 | Sofia Papagiannaki | |
360 | efb1f3d3 | Sofia Papagiannaki | l = ['0-499', '-500', '1000-'] |
361 | efb1f3d3 | Sofia Papagiannaki | ranges = 'bytes=%s' % ','.join(l) |
362 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_RANGE=ranges)
|
363 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 206) |
364 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue('content-type' in r) |
365 | efb1f3d3 | Sofia Papagiannaki | p = re.compile( |
366 | efb1f3d3 | Sofia Papagiannaki | 'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
|
367 | efb1f3d3 | Sofia Papagiannaki | re.I) |
368 | efb1f3d3 | Sofia Papagiannaki | m = p.match(r['content-type'])
|
369 | efb1f3d3 | Sofia Papagiannaki | if m is None: |
370 | efb1f3d3 | Sofia Papagiannaki | self.fail('Invalid multiple range content type') |
371 | efb1f3d3 | Sofia Papagiannaki | boundary = m.groupdict()['boundary']
|
372 | efb1f3d3 | Sofia Papagiannaki | cparts = r.content.split('--%s' % boundary)[1:-1] |
373 | efb1f3d3 | Sofia Papagiannaki | |
374 | efb1f3d3 | Sofia Papagiannaki | # assert content parts length
|
375 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(len(cparts), len(l)) |
376 | efb1f3d3 | Sofia Papagiannaki | |
377 | efb1f3d3 | Sofia Papagiannaki | # for each content part assert headers
|
378 | efb1f3d3 | Sofia Papagiannaki | i = 0
|
379 | efb1f3d3 | Sofia Papagiannaki | for cpart in cparts: |
380 | efb1f3d3 | Sofia Papagiannaki | content = cpart.split('\r\n')
|
381 | efb1f3d3 | Sofia Papagiannaki | headers = content[1:3] |
382 | efb1f3d3 | Sofia Papagiannaki | content_range = headers[0].split(': ') |
383 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(content_range[0], 'Content-Range') |
384 | efb1f3d3 | Sofia Papagiannaki | |
385 | efb1f3d3 | Sofia Papagiannaki | r = l[i].split('-')
|
386 | efb1f3d3 | Sofia Papagiannaki | if not r[0] and not r[1]: |
387 | efb1f3d3 | Sofia Papagiannaki | pass
|
388 | efb1f3d3 | Sofia Papagiannaki | elif not r[0]: |
389 | efb1f3d3 | Sofia Papagiannaki | start = len(odata) - int(r[1]) |
390 | efb1f3d3 | Sofia Papagiannaki | end = len(odata)
|
391 | efb1f3d3 | Sofia Papagiannaki | elif not r[1]: |
392 | efb1f3d3 | Sofia Papagiannaki | start = int(r[0]) |
393 | efb1f3d3 | Sofia Papagiannaki | end = len(odata)
|
394 | efb1f3d3 | Sofia Papagiannaki | else:
|
395 | efb1f3d3 | Sofia Papagiannaki | start = int(r[0]) |
396 | efb1f3d3 | Sofia Papagiannaki | end = int(r[1]) + 1 |
397 | efb1f3d3 | Sofia Papagiannaki | fdata = odata[start:end] |
398 | efb1f3d3 | Sofia Papagiannaki | sdata = '\r\n'.join(content[4:-1]) |
399 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(len(fdata), len(sdata)) |
400 | efb1f3d3 | Sofia Papagiannaki | self.assertEquals(fdata, sdata)
|
401 | efb1f3d3 | Sofia Papagiannaki | i += 1
|
402 | efb1f3d3 | Sofia Papagiannaki | |
403 | efb1f3d3 | Sofia Papagiannaki | def test_public_multiple_range_not_satisfiable(self): |
404 | efb1f3d3 | Sofia Papagiannaki | # perform get with multiple range
|
405 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
406 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
407 | efb1f3d3 | Sofia Papagiannaki | |
408 | efb1f3d3 | Sofia Papagiannaki | # set public
|
409 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
410 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
411 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
412 | efb1f3d3 | Sofia Papagiannaki | |
413 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
414 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
415 | efb1f3d3 | Sofia Papagiannaki | |
416 | efb1f3d3 | Sofia Papagiannaki | out_of_range = len(odata) + 1 |
417 | efb1f3d3 | Sofia Papagiannaki | l = ['0-499', '-500', '%d-' % out_of_range] |
418 | efb1f3d3 | Sofia Papagiannaki | ranges = 'bytes=%s' % ','.join(l) |
419 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_RANGE=ranges)
|
420 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 416) |
421 | efb1f3d3 | Sofia Papagiannaki | |
422 | efb1f3d3 | Sofia Papagiannaki | def test_public_get_if_match(self): |
423 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
424 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
425 | efb1f3d3 | Sofia Papagiannaki | |
426 | efb1f3d3 | Sofia Papagiannaki | # set public
|
427 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
428 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
429 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
430 | efb1f3d3 | Sofia Papagiannaki | |
431 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
432 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
433 | efb1f3d3 | Sofia Papagiannaki | |
434 | efb1f3d3 | Sofia Papagiannaki | def assert_matches(etag): |
435 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_MATCH=etag)
|
436 | efb1f3d3 | Sofia Papagiannaki | |
437 | efb1f3d3 | Sofia Papagiannaki | # assert get success
|
438 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
439 | efb1f3d3 | Sofia Papagiannaki | |
440 | efb1f3d3 | Sofia Papagiannaki | # assert response content
|
441 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata)
|
442 | efb1f3d3 | Sofia Papagiannaki | |
443 | efb1f3d3 | Sofia Papagiannaki | # perform get with If-Match
|
444 | efb1f3d3 | Sofia Papagiannaki | if pithos_settings.UPDATE_MD5:
|
445 | efb1f3d3 | Sofia Papagiannaki | assert_matches(md5_hash(odata)) |
446 | efb1f3d3 | Sofia Papagiannaki | else:
|
447 | efb1f3d3 | Sofia Papagiannaki | assert_matches(merkle(odata)) |
448 | efb1f3d3 | Sofia Papagiannaki | |
449 | efb1f3d3 | Sofia Papagiannaki | def test_public_get_if_match_star(self): |
450 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
451 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
452 | efb1f3d3 | Sofia Papagiannaki | |
453 | efb1f3d3 | Sofia Papagiannaki | # set public
|
454 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
455 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
456 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
457 | efb1f3d3 | Sofia Papagiannaki | |
458 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
459 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
460 | efb1f3d3 | Sofia Papagiannaki | |
461 | efb1f3d3 | Sofia Papagiannaki | # perform get with If-Match *
|
462 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_MATCH='*') |
463 | efb1f3d3 | Sofia Papagiannaki | |
464 | efb1f3d3 | Sofia Papagiannaki | # assert get success
|
465 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
466 | efb1f3d3 | Sofia Papagiannaki | |
467 | efb1f3d3 | Sofia Papagiannaki | # assert response content
|
468 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata)
|
469 | efb1f3d3 | Sofia Papagiannaki | |
470 | efb1f3d3 | Sofia Papagiannaki | def test_public_get_multiple_if_match(self): |
471 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
472 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
473 | efb1f3d3 | Sofia Papagiannaki | |
474 | efb1f3d3 | Sofia Papagiannaki | # set public
|
475 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
476 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
477 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
478 | efb1f3d3 | Sofia Papagiannaki | |
479 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
480 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
481 | efb1f3d3 | Sofia Papagiannaki | |
482 | efb1f3d3 | Sofia Papagiannaki | def assert_multiple_match(etag): |
483 | efb1f3d3 | Sofia Papagiannaki | quoted = lambda s: '"%s"' % s |
484 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_MATCH=','.join( |
485 | efb1f3d3 | Sofia Papagiannaki | [quoted(etag), quoted(get_random_data(64))]))
|
486 | efb1f3d3 | Sofia Papagiannaki | |
487 | efb1f3d3 | Sofia Papagiannaki | # assert get success
|
488 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
489 | efb1f3d3 | Sofia Papagiannaki | |
490 | efb1f3d3 | Sofia Papagiannaki | # assert response content
|
491 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata)
|
492 | efb1f3d3 | Sofia Papagiannaki | |
493 | efb1f3d3 | Sofia Papagiannaki | # perform get with If-Match
|
494 | efb1f3d3 | Sofia Papagiannaki | if pithos_settings.UPDATE_MD5:
|
495 | efb1f3d3 | Sofia Papagiannaki | assert_multiple_match(md5_hash(odata)) |
496 | efb1f3d3 | Sofia Papagiannaki | else:
|
497 | efb1f3d3 | Sofia Papagiannaki | assert_multiple_match(merkle(odata)) |
498 | efb1f3d3 | Sofia Papagiannaki | |
499 | efb1f3d3 | Sofia Papagiannaki | def test_public_if_match_precondition_failed(self): |
500 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
501 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
502 | efb1f3d3 | Sofia Papagiannaki | |
503 | efb1f3d3 | Sofia Papagiannaki | # set public
|
504 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
505 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
506 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
507 | efb1f3d3 | Sofia Papagiannaki | |
508 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
509 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
510 | efb1f3d3 | Sofia Papagiannaki | |
511 | efb1f3d3 | Sofia Papagiannaki | # perform get with If-Match
|
512 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_MATCH=get_random_name())
|
513 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 412) |
514 | efb1f3d3 | Sofia Papagiannaki | |
515 | efb1f3d3 | Sofia Papagiannaki | def test_public_if_none_match(self): |
516 | efb1f3d3 | Sofia Papagiannaki | # upload object
|
517 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
518 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
519 | efb1f3d3 | Sofia Papagiannaki | |
520 | efb1f3d3 | Sofia Papagiannaki | # set public
|
521 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
522 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
523 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
524 | efb1f3d3 | Sofia Papagiannaki | |
525 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
526 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
527 | efb1f3d3 | Sofia Papagiannaki | |
528 | efb1f3d3 | Sofia Papagiannaki | def assert_non_match(etag): |
529 | efb1f3d3 | Sofia Papagiannaki | # perform get with If-None-Match
|
530 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_NONE_MATCH=etag)
|
531 | efb1f3d3 | Sofia Papagiannaki | |
532 | efb1f3d3 | Sofia Papagiannaki | # assert precondition_failed
|
533 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 304) |
534 | efb1f3d3 | Sofia Papagiannaki | |
535 | efb1f3d3 | Sofia Papagiannaki | # update object data
|
536 | efb1f3d3 | Sofia Papagiannaki | r = self.append_object_data(cname, oname)[-1] |
537 | efb1f3d3 | Sofia Papagiannaki | self.assertTrue(etag != r['ETag']) |
538 | efb1f3d3 | Sofia Papagiannaki | |
539 | efb1f3d3 | Sofia Papagiannaki | # perform get with If-None-Match
|
540 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_NONE_MATCH=etag)
|
541 | efb1f3d3 | Sofia Papagiannaki | |
542 | efb1f3d3 | Sofia Papagiannaki | # assert get success
|
543 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
544 | efb1f3d3 | Sofia Papagiannaki | |
545 | efb1f3d3 | Sofia Papagiannaki | if pithos_settings.UPDATE_MD5:
|
546 | efb1f3d3 | Sofia Papagiannaki | assert_non_match(md5_hash(odata)) |
547 | efb1f3d3 | Sofia Papagiannaki | else:
|
548 | efb1f3d3 | Sofia Papagiannaki | assert_non_match(merkle(odata)) |
549 | efb1f3d3 | Sofia Papagiannaki | |
550 | efb1f3d3 | Sofia Papagiannaki | def test_public_if_none_match_star(self): |
551 | efb1f3d3 | Sofia Papagiannaki | # upload object
|
552 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
553 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
554 | efb1f3d3 | Sofia Papagiannaki | |
555 | efb1f3d3 | Sofia Papagiannaki | # set public
|
556 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
557 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
558 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
559 | efb1f3d3 | Sofia Papagiannaki | |
560 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
561 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
562 | efb1f3d3 | Sofia Papagiannaki | |
563 | efb1f3d3 | Sofia Papagiannaki | # perform get with If-None-Match with star
|
564 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_NONE_MATCH='*') |
565 | efb1f3d3 | Sofia Papagiannaki | |
566 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 304) |
567 | efb1f3d3 | Sofia Papagiannaki | |
568 | efb1f3d3 | Sofia Papagiannaki | def test_public_if_modified_sinse(self): |
569 | efb1f3d3 | Sofia Papagiannaki | cname = get_random_name() |
570 | efb1f3d3 | Sofia Papagiannaki | self.create_container(cname)
|
571 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
572 | efb1f3d3 | Sofia Papagiannaki | self._assert_not_public_object(cname, oname)
|
573 | efb1f3d3 | Sofia Papagiannaki | |
574 | efb1f3d3 | Sofia Papagiannaki | # set public
|
575 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
576 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
577 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
578 | efb1f3d3 | Sofia Papagiannaki | |
579 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
580 | efb1f3d3 | Sofia Papagiannaki | public = info['X-Object-Public']
|
581 | efb1f3d3 | Sofia Papagiannaki | |
582 | efb1f3d3 | Sofia Papagiannaki | object_info = self.get_object_info(cname, oname)
|
583 | efb1f3d3 | Sofia Papagiannaki | last_modified = object_info['Last-Modified']
|
584 | efb1f3d3 | Sofia Papagiannaki | t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
|
585 | efb1f3d3 | Sofia Papagiannaki | t1_formats = map(t1.strftime, DATE_FORMATS)
|
586 | efb1f3d3 | Sofia Papagiannaki | |
587 | efb1f3d3 | Sofia Papagiannaki | for t in t1_formats: |
588 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public, user='user2', HTTP_IF_MODIFIED_SINCE=t, |
589 | efb1f3d3 | Sofia Papagiannaki | token=None)
|
590 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 304) |
591 | efb1f3d3 | Sofia Papagiannaki | |
592 | efb1f3d3 | Sofia Papagiannaki | _time.sleep(1)
|
593 | efb1f3d3 | Sofia Papagiannaki | |
594 | efb1f3d3 | Sofia Papagiannaki | # update object data
|
595 | efb1f3d3 | Sofia Papagiannaki | appended_data = self.append_object_data(cname, oname)[1] |
596 | efb1f3d3 | Sofia Papagiannaki | |
597 | efb1f3d3 | Sofia Papagiannaki | # Check modified since
|
598 | efb1f3d3 | Sofia Papagiannaki | for t in t1_formats: |
599 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public, user='user2', HTTP_IF_MODIFIED_SINCE=t, |
600 | efb1f3d3 | Sofia Papagiannaki | token=None)
|
601 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
602 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata + appended_data)
|
603 | efb1f3d3 | Sofia Papagiannaki | |
604 | efb1f3d3 | Sofia Papagiannaki | def test_public_if_modified_since_invalid_date(self): |
605 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
606 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
607 | efb1f3d3 | Sofia Papagiannaki | |
608 | efb1f3d3 | Sofia Papagiannaki | # set public
|
609 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
610 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
611 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
612 | efb1f3d3 | Sofia Papagiannaki | |
613 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
614 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
615 | efb1f3d3 | Sofia Papagiannaki | |
616 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_MODIFIED_SINCE='Monday') |
617 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
618 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata)
|
619 | efb1f3d3 | Sofia Papagiannaki | |
620 | efb1f3d3 | Sofia Papagiannaki | def test_public_if_public_not_modified_since(self): |
621 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
622 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
623 | efb1f3d3 | Sofia Papagiannaki | |
624 | efb1f3d3 | Sofia Papagiannaki | # set public
|
625 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
626 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
627 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
628 | efb1f3d3 | Sofia Papagiannaki | |
629 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
630 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
631 | efb1f3d3 | Sofia Papagiannaki | last_modified = info['Last-Modified']
|
632 | efb1f3d3 | Sofia Papagiannaki | t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
|
633 | efb1f3d3 | Sofia Papagiannaki | |
634 | efb1f3d3 | Sofia Papagiannaki | # Check unmodified
|
635 | efb1f3d3 | Sofia Papagiannaki | t1 = t + datetime.timedelta(seconds=1)
|
636 | efb1f3d3 | Sofia Papagiannaki | t1_formats = map(t1.strftime, DATE_FORMATS)
|
637 | efb1f3d3 | Sofia Papagiannaki | for t in t1_formats: |
638 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
639 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
640 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata)
|
641 | efb1f3d3 | Sofia Papagiannaki | |
642 | efb1f3d3 | Sofia Papagiannaki | # modify object
|
643 | efb1f3d3 | Sofia Papagiannaki | _time.sleep(2)
|
644 | efb1f3d3 | Sofia Papagiannaki | self.append_object_data(cname, oname)
|
645 | efb1f3d3 | Sofia Papagiannaki | |
646 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
647 | efb1f3d3 | Sofia Papagiannaki | last_modified = info['Last-Modified']
|
648 | efb1f3d3 | Sofia Papagiannaki | t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
|
649 | efb1f3d3 | Sofia Papagiannaki | t2 = t - datetime.timedelta(seconds=1)
|
650 | efb1f3d3 | Sofia Papagiannaki | t2_formats = map(t2.strftime, DATE_FORMATS)
|
651 | efb1f3d3 | Sofia Papagiannaki | |
652 | efb1f3d3 | Sofia Papagiannaki | # check modified
|
653 | efb1f3d3 | Sofia Papagiannaki | for t in t2_formats: |
654 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
655 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 412) |
656 | efb1f3d3 | Sofia Papagiannaki | |
657 | efb1f3d3 | Sofia Papagiannaki | # modify account: update object meta
|
658 | efb1f3d3 | Sofia Papagiannaki | _time.sleep(1)
|
659 | efb1f3d3 | Sofia Papagiannaki | self.update_object_meta(cname, oname, {'foo': 'bar'}) |
660 | efb1f3d3 | Sofia Papagiannaki | |
661 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
662 | efb1f3d3 | Sofia Papagiannaki | last_modified = info['Last-Modified']
|
663 | efb1f3d3 | Sofia Papagiannaki | t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
|
664 | efb1f3d3 | Sofia Papagiannaki | t3 = t - datetime.timedelta(seconds=1)
|
665 | efb1f3d3 | Sofia Papagiannaki | t3_formats = map(t3.strftime, DATE_FORMATS)
|
666 | efb1f3d3 | Sofia Papagiannaki | |
667 | efb1f3d3 | Sofia Papagiannaki | # check modified
|
668 | efb1f3d3 | Sofia Papagiannaki | for t in t3_formats: |
669 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
|
670 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 412) |
671 | efb1f3d3 | Sofia Papagiannaki | |
672 | efb1f3d3 | Sofia Papagiannaki | def test_public_if_unmodified_since(self): |
673 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
674 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
675 | efb1f3d3 | Sofia Papagiannaki | |
676 | efb1f3d3 | Sofia Papagiannaki | # set public
|
677 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
678 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
679 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
680 | efb1f3d3 | Sofia Papagiannaki | |
681 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
682 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
683 | efb1f3d3 | Sofia Papagiannaki | last_modified = info['Last-Modified']
|
684 | efb1f3d3 | Sofia Papagiannaki | t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
|
685 | efb1f3d3 | Sofia Papagiannaki | t = t + datetime.timedelta(seconds=1)
|
686 | efb1f3d3 | Sofia Papagiannaki | t_formats = map(t.strftime, DATE_FORMATS)
|
687 | efb1f3d3 | Sofia Papagiannaki | |
688 | efb1f3d3 | Sofia Papagiannaki | for tf in t_formats: |
689 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=tf)
|
690 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 200) |
691 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.content, odata)
|
692 | efb1f3d3 | Sofia Papagiannaki | |
693 | efb1f3d3 | Sofia Papagiannaki | def test_public_if_unmodified_since_precondition_failed(self): |
694 | efb1f3d3 | Sofia Papagiannaki | cname = self.create_container()[0] |
695 | efb1f3d3 | Sofia Papagiannaki | oname, odata = self.upload_object(cname)[:-1] |
696 | efb1f3d3 | Sofia Papagiannaki | |
697 | efb1f3d3 | Sofia Papagiannaki | # set public
|
698 | efb1f3d3 | Sofia Papagiannaki | url = join_urls(self.pithos_path, self.user, cname, oname) |
699 | efb1f3d3 | Sofia Papagiannaki | r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true') |
700 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 202) |
701 | efb1f3d3 | Sofia Papagiannaki | |
702 | efb1f3d3 | Sofia Papagiannaki | info = self.get_object_info(cname, oname)
|
703 | efb1f3d3 | Sofia Papagiannaki | public_url = info['X-Object-Public']
|
704 | efb1f3d3 | Sofia Papagiannaki | last_modified = info['Last-Modified']
|
705 | efb1f3d3 | Sofia Papagiannaki | t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
|
706 | efb1f3d3 | Sofia Papagiannaki | t = t - datetime.timedelta(seconds=1)
|
707 | efb1f3d3 | Sofia Papagiannaki | t_formats = map(t.strftime, DATE_FORMATS)
|
708 | efb1f3d3 | Sofia Papagiannaki | |
709 | efb1f3d3 | Sofia Papagiannaki | for tf in t_formats: |
710 | efb1f3d3 | Sofia Papagiannaki | r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=tf)
|
711 | efb1f3d3 | Sofia Papagiannaki | self.assertEqual(r.status_code, 412) |