Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 2aba7764

History | View | Annotate | Download (74.8 kB)

1 3a19e99b Sofia Papagiannaki
#!/usr/bin/env python
2 3a19e99b Sofia Papagiannaki
#coding=utf8
3 3a19e99b Sofia Papagiannaki
4 3a19e99b Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 3a19e99b Sofia Papagiannaki
#
6 3a19e99b Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 3a19e99b Sofia Papagiannaki
# without modification, are permitted provided that the following
8 3a19e99b Sofia Papagiannaki
# conditions are met:
9 3a19e99b Sofia Papagiannaki
#
10 3a19e99b Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 3a19e99b Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 3a19e99b Sofia Papagiannaki
#      disclaimer.
13 3a19e99b Sofia Papagiannaki
#
14 3a19e99b Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 3a19e99b Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 3a19e99b Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 3a19e99b Sofia Papagiannaki
#      provided with the distribution.
18 3a19e99b Sofia Papagiannaki
#
19 3a19e99b Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 3a19e99b Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 3a19e99b Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 3a19e99b Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 3a19e99b Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 3a19e99b Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 3a19e99b Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 3a19e99b Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 3a19e99b Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 3a19e99b Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 3a19e99b Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 3a19e99b Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 3a19e99b Sofia Papagiannaki
#
32 3a19e99b Sofia Papagiannaki
# The views and conclusions contained in the software and
33 3a19e99b Sofia Papagiannaki
# documentation are those of the authors and should not be
34 3a19e99b Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 3a19e99b Sofia Papagiannaki
# or implied, of GRNET S.A.
36 3a19e99b Sofia Papagiannaki
37 3a19e99b Sofia Papagiannaki
from collections import defaultdict
38 f53483d8 Sofia Papagiannaki
from urllib import quote, unquote
39 5fe43b8c Sofia Papagiannaki
from functools import partial
40 3a19e99b Sofia Papagiannaki
41 3a19e99b Sofia Papagiannaki
from pithos.api.test import (PithosAPITest, pithos_settings,
42 3a19e99b Sofia Papagiannaki
                             AssertMappingInvariant, AssertUUidInvariant,
43 5fe43b8c Sofia Papagiannaki
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44 3a19e99b Sofia Papagiannaki
                             DATE_FORMATS)
45 bc4f25c0 Sofia Papagiannaki
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46 bc4f25c0 Sofia Papagiannaki
                                  get_random_data, get_random_name)
47 3a19e99b Sofia Papagiannaki
48 3a19e99b Sofia Papagiannaki
from synnefo.lib import join_urls
49 3a19e99b Sofia Papagiannaki
50 3a19e99b Sofia Papagiannaki
import django.utils.simplejson as json
51 3a19e99b Sofia Papagiannaki
52 3a19e99b Sofia Papagiannaki
import random
53 3a19e99b Sofia Papagiannaki
import re
54 3a19e99b Sofia Papagiannaki
import datetime
55 5fe43b8c Sofia Papagiannaki
import time as _time
56 5fe43b8c Sofia Papagiannaki
57 5fe43b8c Sofia Papagiannaki
merkle = partial(merkle,
58 5fe43b8c Sofia Papagiannaki
                 blocksize=TEST_BLOCK_SIZE,
59 5fe43b8c Sofia Papagiannaki
                 blockhash=TEST_HASH_ALGORITHM)
60 3a19e99b Sofia Papagiannaki
61 3a19e99b Sofia Papagiannaki
62 2a194295 Sofia Papagiannaki
class ObjectHead(PithosAPITest):
63 f53483d8 Sofia Papagiannaki
    def test_get_object_meta(self):
64 2a194295 Sofia Papagiannaki
        cname = self.create_container()[0]
65 2a194295 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
66 2a194295 Sofia Papagiannaki
67 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
68 f53483d8 Sofia Papagiannaki
        r = self.head(url)
69 f53483d8 Sofia Papagiannaki
70 f53483d8 Sofia Papagiannaki
        mandatory = ['Etag',
71 f53483d8 Sofia Papagiannaki
                     'Content-Length',
72 f53483d8 Sofia Papagiannaki
                     'Content-Type',
73 f53483d8 Sofia Papagiannaki
                     'Last-Modified',
74 f53483d8 Sofia Papagiannaki
                     'X-Object-Hash',
75 f53483d8 Sofia Papagiannaki
                     'X-Object-UUID',
76 f53483d8 Sofia Papagiannaki
                     'X-Object-Version',
77 f53483d8 Sofia Papagiannaki
                     'X-Object-Version-Timestamp',
78 f53483d8 Sofia Papagiannaki
                     'X-Object-Modified-By']
79 f53483d8 Sofia Papagiannaki
        for i in mandatory:
80 f53483d8 Sofia Papagiannaki
            self.assertTrue(i in r)
81 f53483d8 Sofia Papagiannaki
82 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='',
83 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_ENCODING='gzip',
84 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_DISPOSITION=(
85 f53483d8 Sofia Papagiannaki
                          'attachment; filename="%s"' % oname))
86 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
87 f53483d8 Sofia Papagiannaki
88 2a194295 Sofia Papagiannaki
        r = self.head(url)
89 f53483d8 Sofia Papagiannaki
        for i in mandatory:
90 f53483d8 Sofia Papagiannaki
            self.assertTrue(i in r)
91 f53483d8 Sofia Papagiannaki
        self.assertTrue('Content-Encoding' in r)
92 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['Content-Encoding'], 'gzip')
93 f53483d8 Sofia Papagiannaki
        self.assertTrue('Content-Disposition' in r)
94 f53483d8 Sofia Papagiannaki
        self.assertEqual(unquote(r['Content-Disposition']),
95 f53483d8 Sofia Papagiannaki
                         'attachment; filename="%s"' % oname)
96 f53483d8 Sofia Papagiannaki
97 f53483d8 Sofia Papagiannaki
        prefix = 'myobject/'
98 f53483d8 Sofia Papagiannaki
        data = ''
99 f53483d8 Sofia Papagiannaki
        for i in range(random.randint(2, 10)):
100 f53483d8 Sofia Papagiannaki
            part = '%s%d' % (prefix, i)
101 f53483d8 Sofia Papagiannaki
            data += self.upload_object(cname, oname=part)[1]
102 f53483d8 Sofia Papagiannaki
103 f53483d8 Sofia Papagiannaki
        manifest = '%s/%s' % (cname, prefix)
104 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
105 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
106 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
107 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
108 f53483d8 Sofia Papagiannaki
109 f53483d8 Sofia Papagiannaki
        r = self.head(url)
110 f53483d8 Sofia Papagiannaki
        for i in mandatory:
111 f53483d8 Sofia Papagiannaki
            self.assertTrue(i in r)
112 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Manifest' in r)
113 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Manifest'], manifest)
114 2a194295 Sofia Papagiannaki
115 2a194295 Sofia Papagiannaki
116 3a19e99b Sofia Papagiannaki
class ObjectGet(PithosAPITest):
117 3a19e99b Sofia Papagiannaki
    def setUp(self):
118 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
119 3a19e99b Sofia Papagiannaki
        self.containers = ['c1', 'c2']
120 3a19e99b Sofia Papagiannaki
121 3a19e99b Sofia Papagiannaki
        # create some containers
122 3a19e99b Sofia Papagiannaki
        for c in self.containers:
123 3a19e99b Sofia Papagiannaki
            self.create_container(c)
124 3a19e99b Sofia Papagiannaki
125 3a19e99b Sofia Papagiannaki
        # upload files
126 3a19e99b Sofia Papagiannaki
        self.objects = defaultdict(list)
127 3a19e99b Sofia Papagiannaki
        self.objects['c1'].append(self.upload_object('c1')[0])
128 3a19e99b Sofia Papagiannaki
129 3a19e99b Sofia Papagiannaki
    def test_versions(self):
130 3a19e99b Sofia Papagiannaki
        c = 'c1'
131 3a19e99b Sofia Papagiannaki
        o = self.objects[c][0]
132 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
133 3a19e99b Sofia Papagiannaki
134 3a19e99b Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
135 3a19e99b Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
136 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
137 3a19e99b Sofia Papagiannaki
138 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
139 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
140 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
141 3a19e99b Sofia Papagiannaki
        l1 = json.loads(r.content)['versions']
142 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l1), 2)
143 3a19e99b Sofia Papagiannaki
144 3a19e99b Sofia Papagiannaki
        # update meta
145 3a19e99b Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
146 3a19e99b Sofia Papagiannaki
                'HTTP_X_OBJECT_META_STOCK': 'True'}
147 3a19e99b Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
148 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
149 3a19e99b Sofia Papagiannaki
150 3a19e99b Sofia Papagiannaki
        # assert a newly created version has been created
151 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
152 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
153 3a19e99b Sofia Papagiannaki
        l2 = json.loads(r.content)['versions']
154 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l2), len(l1) + 1)
155 3a19e99b Sofia Papagiannaki
        self.assertEqual(l2[:-1], l1)
156 3a19e99b Sofia Papagiannaki
157 3a19e99b Sofia Papagiannaki
        vserial, _ = l2[-2]
158 3a19e99b Sofia Papagiannaki
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
159 6ee6677e Sofia Papagiannaki
                         {'Quality': 'AAA'})
160 3a19e99b Sofia Papagiannaki
161 3a19e99b Sofia Papagiannaki
        # update data
162 3a19e99b Sofia Papagiannaki
        self.append_object_data(c, o)
163 3a19e99b Sofia Papagiannaki
164 3a19e99b Sofia Papagiannaki
        # assert a newly created version has been created
165 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
166 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
167 3a19e99b Sofia Papagiannaki
        l3 = json.loads(r.content)['versions']
168 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l3), len(l2) + 1)
169 3a19e99b Sofia Papagiannaki
        self.assertEqual(l3[:-1], l2)
170 3a19e99b Sofia Papagiannaki
171 adce84cd Sofia Papagiannaki
    def test_get_version(self):
172 adce84cd Sofia Papagiannaki
        c = 'c1'
173 adce84cd Sofia Papagiannaki
        o = self.objects[c][0]
174 adce84cd Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
175 adce84cd Sofia Papagiannaki
176 adce84cd Sofia Papagiannaki
        # Update metadata
177 adce84cd Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
178 adce84cd Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
179 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
180 adce84cd Sofia Papagiannaki
181 adce84cd Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
182 adce84cd Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
183 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
184 adce84cd Sofia Papagiannaki
        l = json.loads(r.content)['versions']
185 adce84cd Sofia Papagiannaki
        self.assertEqual(len(l), 2)
186 adce84cd Sofia Papagiannaki
187 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, l[0][0]))
188 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
189 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Quality' not in r)
190 adce84cd Sofia Papagiannaki
191 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, l[1][0]))
192 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
193 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Quality' in r)
194 adce84cd Sofia Papagiannaki
195 adce84cd Sofia Papagiannaki
        # test invalid version
196 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=-1' % url)
197 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
198 adce84cd Sofia Papagiannaki
199 adce84cd Sofia Papagiannaki
        other_name, other_data, r = self.upload_object(c)
200 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
201 adce84cd Sofia Papagiannaki
        other_version = r['X-Object-Version']
202 adce84cd Sofia Papagiannaki
203 adce84cd Sofia Papagiannaki
        self.assertTrue(o != other_name)
204 adce84cd Sofia Papagiannaki
205 adce84cd Sofia Papagiannaki
        r = self.get('%s?version=%s' % (url, other_version))
206 6ee6677e Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
207 adce84cd Sofia Papagiannaki
208 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, other_version))
209 6ee6677e Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
210 adce84cd Sofia Papagiannaki
211 3a19e99b Sofia Papagiannaki
    def test_objects_with_trailing_spaces(self):
212 3a19e99b Sofia Papagiannaki
        # create object
213 3a19e99b Sofia Papagiannaki
        oname = self.upload_object('c1')[0]
214 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
215 3a19e99b Sofia Papagiannaki
216 3a19e99b Sofia Papagiannaki
        r = self.get(quote('%s ' % url))
217 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
218 3a19e99b Sofia Papagiannaki
219 3a19e99b Sofia Papagiannaki
        # delete object
220 3a19e99b Sofia Papagiannaki
        self.delete(url)
221 3a19e99b Sofia Papagiannaki
222 3a19e99b Sofia Papagiannaki
        r = self.get(url)
223 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
224 3a19e99b Sofia Papagiannaki
225 3a19e99b Sofia Papagiannaki
        # upload object with trailing space
226 bc4f25c0 Sofia Papagiannaki
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
227 3a19e99b Sofia Papagiannaki
228 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
229 3a19e99b Sofia Papagiannaki
        r = self.get(url)
230 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
231 3a19e99b Sofia Papagiannaki
232 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
233 3a19e99b Sofia Papagiannaki
        r = self.get(url)
234 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
235 3a19e99b Sofia Papagiannaki
236 3a19e99b Sofia Papagiannaki
    def test_get_partial(self):
237 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
238 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
239 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
240 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=0-499')
241 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
242 3a19e99b Sofia Papagiannaki
        data = r.content
243 3a19e99b Sofia Papagiannaki
        self.assertEqual(data, odata[:500])
244 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
245 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
246 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
247 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
248 3a19e99b Sofia Papagiannaki
249 3a19e99b Sofia Papagiannaki
    def test_get_final_500(self):
250 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
251 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
252 3a19e99b Sofia Papagiannaki
        size = len(odata)
253 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
254 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=-500')
255 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
256 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata[-500:])
257 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
258 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
259 3a19e99b Sofia Papagiannaki
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
260 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
261 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
262 3a19e99b Sofia Papagiannaki
263 3a19e99b Sofia Papagiannaki
    def test_get_rest(self):
264 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
265 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
266 3a19e99b Sofia Papagiannaki
        size = len(odata)
267 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
268 3a19e99b Sofia Papagiannaki
        offset = len(odata) - random.randint(1, 512)
269 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
270 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
271 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata[offset:])
272 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
273 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
274 3a19e99b Sofia Papagiannaki
                         'bytes %s-%s/%s' % (offset, size - 1, size))
275 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
276 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
277 3a19e99b Sofia Papagiannaki
278 3a19e99b Sofia Papagiannaki
    def test_get_range_not_satisfiable(self):
279 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
280 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
281 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
282 3a19e99b Sofia Papagiannaki
283 3a19e99b Sofia Papagiannaki
        # TODO
284 3a19e99b Sofia Papagiannaki
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
285 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r.status_code, 416)
286 3a19e99b Sofia Papagiannaki
287 3a19e99b Sofia Papagiannaki
        offset = len(odata) + 1
288 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
289 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
290 3a19e99b Sofia Papagiannaki
291 3a19e99b Sofia Papagiannaki
    def test_multiple_range(self):
292 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
293 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
294 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
295 3a19e99b Sofia Papagiannaki
296 3a19e99b Sofia Papagiannaki
        l = ['0-499', '-500', '1000-']
297 3a19e99b Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
298 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE=ranges)
299 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
300 3a19e99b Sofia Papagiannaki
        self.assertTrue('content-type' in r)
301 3a19e99b Sofia Papagiannaki
        p = re.compile(
302 3a19e99b Sofia Papagiannaki
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
303 3a19e99b Sofia Papagiannaki
            re.I)
304 3a19e99b Sofia Papagiannaki
        m = p.match(r['content-type'])
305 3a19e99b Sofia Papagiannaki
        if m is None:
306 3a19e99b Sofia Papagiannaki
            self.fail('Invalid multiple range content type')
307 3a19e99b Sofia Papagiannaki
        boundary = m.groupdict()['boundary']
308 3a19e99b Sofia Papagiannaki
        cparts = r.content.split('--%s' % boundary)[1:-1]
309 3a19e99b Sofia Papagiannaki
310 3a19e99b Sofia Papagiannaki
        # assert content parts length
311 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(cparts), len(l))
312 3a19e99b Sofia Papagiannaki
313 3a19e99b Sofia Papagiannaki
        # for each content part assert headers
314 3a19e99b Sofia Papagiannaki
        i = 0
315 3a19e99b Sofia Papagiannaki
        for cpart in cparts:
316 3a19e99b Sofia Papagiannaki
            content = cpart.split('\r\n')
317 3a19e99b Sofia Papagiannaki
            headers = content[1:3]
318 3a19e99b Sofia Papagiannaki
            content_range = headers[0].split(': ')
319 3a19e99b Sofia Papagiannaki
            self.assertEqual(content_range[0], 'Content-Range')
320 3a19e99b Sofia Papagiannaki
321 3a19e99b Sofia Papagiannaki
            r = l[i].split('-')
322 3a19e99b Sofia Papagiannaki
            if not r[0] and not r[1]:
323 3a19e99b Sofia Papagiannaki
                pass
324 3a19e99b Sofia Papagiannaki
            elif not r[0]:
325 3a19e99b Sofia Papagiannaki
                start = len(odata) - int(r[1])
326 3a19e99b Sofia Papagiannaki
                end = len(odata)
327 3a19e99b Sofia Papagiannaki
            elif not r[1]:
328 3a19e99b Sofia Papagiannaki
                start = int(r[0])
329 3a19e99b Sofia Papagiannaki
                end = len(odata)
330 3a19e99b Sofia Papagiannaki
            else:
331 3a19e99b Sofia Papagiannaki
                start = int(r[0])
332 3a19e99b Sofia Papagiannaki
                end = int(r[1]) + 1
333 3a19e99b Sofia Papagiannaki
            fdata = odata[start:end]
334 3a19e99b Sofia Papagiannaki
            sdata = '\r\n'.join(content[4:-1])
335 3a19e99b Sofia Papagiannaki
            self.assertEqual(len(fdata), len(sdata))
336 3a19e99b Sofia Papagiannaki
            self.assertEquals(fdata, sdata)
337 3a19e99b Sofia Papagiannaki
            i += 1
338 3a19e99b Sofia Papagiannaki
339 3a19e99b Sofia Papagiannaki
    def test_multiple_range_not_satisfiable(self):
340 3a19e99b Sofia Papagiannaki
        # perform get with multiple range
341 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
342 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
343 3a19e99b Sofia Papagiannaki
        out_of_range = len(odata) + 1
344 3a19e99b Sofia Papagiannaki
        l = ['0-499', '-500', '%d-' % out_of_range]
345 3a19e99b Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
346 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
347 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE=ranges)
348 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
349 3a19e99b Sofia Papagiannaki
350 5fe43b8c Sofia Papagiannaki
    def test_get_if_match(self):
351 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
352 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
353 3a19e99b Sofia Papagiannaki
354 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
355 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
356 3a19e99b Sofia Papagiannaki
357 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
358 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
359 3a19e99b Sofia Papagiannaki
        else:
360 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
361 3a19e99b Sofia Papagiannaki
362 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=etag)
363 3a19e99b Sofia Papagiannaki
364 3a19e99b Sofia Papagiannaki
        # assert get success
365 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
366 3a19e99b Sofia Papagiannaki
367 3a19e99b Sofia Papagiannaki
        # assert response content
368 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
369 3a19e99b Sofia Papagiannaki
370 5fe43b8c Sofia Papagiannaki
    def test_get_if_match_star(self):
371 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
372 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
373 3a19e99b Sofia Papagiannaki
374 3a19e99b Sofia Papagiannaki
        # perform get with If-Match *
375 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
376 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH='*')
377 3a19e99b Sofia Papagiannaki
378 3a19e99b Sofia Papagiannaki
        # assert get success
379 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
380 3a19e99b Sofia Papagiannaki
381 3a19e99b Sofia Papagiannaki
        # assert response content
382 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
383 3a19e99b Sofia Papagiannaki
384 5fe43b8c Sofia Papagiannaki
    def test_get_multiple_if_match(self):
385 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
386 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
387 3a19e99b Sofia Papagiannaki
388 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
389 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
390 3a19e99b Sofia Papagiannaki
391 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
392 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
393 3a19e99b Sofia Papagiannaki
        else:
394 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
395 3a19e99b Sofia Papagiannaki
396 5fe43b8c Sofia Papagiannaki
        quoted = lambda s: '"%s"' % s
397 5fe43b8c Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=','.join(
398 5fe43b8c Sofia Papagiannaki
            [quoted(etag), quoted(get_random_data(64))]))
399 3a19e99b Sofia Papagiannaki
400 3a19e99b Sofia Papagiannaki
        # assert get success
401 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
402 3a19e99b Sofia Papagiannaki
403 3a19e99b Sofia Papagiannaki
        # assert response content
404 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
405 3a19e99b Sofia Papagiannaki
406 3a19e99b Sofia Papagiannaki
    def test_if_match_precondition_failed(self):
407 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
408 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
409 3a19e99b Sofia Papagiannaki
410 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
411 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
412 bc4f25c0 Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
413 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 412)
414 3a19e99b Sofia Papagiannaki
415 a1f429b2 Sofia Papagiannaki
    def test_if_none_match(self):
416 3a19e99b Sofia Papagiannaki
        # upload object
417 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
418 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
419 3a19e99b Sofia Papagiannaki
420 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
421 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
422 3a19e99b Sofia Papagiannaki
        else:
423 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
424 3a19e99b Sofia Papagiannaki
425 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match
426 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
427 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
428 3a19e99b Sofia Papagiannaki
429 3a19e99b Sofia Papagiannaki
        # assert precondition_failed
430 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
431 3a19e99b Sofia Papagiannaki
432 3a19e99b Sofia Papagiannaki
        # update object data
433 3a19e99b Sofia Papagiannaki
        r = self.append_object_data(cname, oname)[-1]
434 3a19e99b Sofia Papagiannaki
        self.assertTrue(etag != r['ETag'])
435 3a19e99b Sofia Papagiannaki
436 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match
437 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
438 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
439 3a19e99b Sofia Papagiannaki
440 3a19e99b Sofia Papagiannaki
        # assert get success
441 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
442 3a19e99b Sofia Papagiannaki
443 a1f429b2 Sofia Papagiannaki
    def test_if_none_match_star(self):
444 3a19e99b Sofia Papagiannaki
        # upload object
445 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
446 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
447 3a19e99b Sofia Papagiannaki
448 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match with star
449 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
450 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
451 3a19e99b Sofia Papagiannaki
452 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
453 3a19e99b Sofia Papagiannaki
454 3a19e99b Sofia Papagiannaki
    def test_if_modified_since(self):
455 3a19e99b Sofia Papagiannaki
        # upload object
456 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
457 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
458 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
459 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
460 3a19e99b Sofia Papagiannaki
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
461 3a19e99b Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
462 3a19e99b Sofia Papagiannaki
463 3a19e99b Sofia Papagiannaki
        # Check not modified since
464 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
465 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
466 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
467 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
468 3a19e99b Sofia Papagiannaki
469 3a19e99b Sofia Papagiannaki
        _time.sleep(1)
470 3a19e99b Sofia Papagiannaki
471 3a19e99b Sofia Papagiannaki
        # update object data
472 3a19e99b Sofia Papagiannaki
        appended_data = self.append_object_data(cname, oname)[1]
473 3a19e99b Sofia Papagiannaki
474 3a19e99b Sofia Papagiannaki
        # Check modified since
475 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
476 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
477 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
478 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
479 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.content, odata + appended_data)
480 3a19e99b Sofia Papagiannaki
481 3a19e99b Sofia Papagiannaki
    def test_if_modified_since_invalid_date(self):
482 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
483 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
484 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
485 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
486 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
487 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
488 3a19e99b Sofia Papagiannaki
489 3a19e99b Sofia Papagiannaki
    def test_if_not_modified_since(self):
490 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
491 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
492 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
493 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
494 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
495 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
496 3a19e99b Sofia Papagiannaki
497 3a19e99b Sofia Papagiannaki
        # Check unmodified
498 3a19e99b Sofia Papagiannaki
        t1 = t + datetime.timedelta(seconds=1)
499 3a19e99b Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
500 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
501 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
502 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
503 2a194295 Sofia Papagiannaki
            self.assertEqual(r.content, odata)
504 3a19e99b Sofia Papagiannaki
505 3a19e99b Sofia Papagiannaki
        # modify object
506 3a19e99b Sofia Papagiannaki
        _time.sleep(2)
507 3a19e99b Sofia Papagiannaki
        self.append_object_data(cname, oname)
508 3a19e99b Sofia Papagiannaki
509 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
510 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
511 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
512 3a19e99b Sofia Papagiannaki
        t2 = t - datetime.timedelta(seconds=1)
513 3a19e99b Sofia Papagiannaki
        t2_formats = map(t2.strftime, DATE_FORMATS)
514 3a19e99b Sofia Papagiannaki
515 5fe43b8c Sofia Papagiannaki
        # check modified
516 3a19e99b Sofia Papagiannaki
        for t in t2_formats:
517 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
518 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
519 3a19e99b Sofia Papagiannaki
520 800af189 Sofia Papagiannaki
        # modify account: update object meta
521 3a19e99b Sofia Papagiannaki
        _time.sleep(1)
522 3a19e99b Sofia Papagiannaki
        self.update_object_meta(cname, oname, {'foo': 'bar'})
523 3a19e99b Sofia Papagiannaki
524 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
525 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
526 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
527 3a19e99b Sofia Papagiannaki
        t3 = t - datetime.timedelta(seconds=1)
528 3a19e99b Sofia Papagiannaki
        t3_formats = map(t3.strftime, DATE_FORMATS)
529 3a19e99b Sofia Papagiannaki
530 5fe43b8c Sofia Papagiannaki
        # check modified
531 3a19e99b Sofia Papagiannaki
        for t in t3_formats:
532 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
533 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
534 3a19e99b Sofia Papagiannaki
535 3a19e99b Sofia Papagiannaki
    def test_if_unmodified_since(self):
536 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
537 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
538 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
539 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
540 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
541 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
542 3a19e99b Sofia Papagiannaki
        t = t + datetime.timedelta(seconds=1)
543 3a19e99b Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
544 3a19e99b Sofia Papagiannaki
545 3a19e99b Sofia Papagiannaki
        for tf in t_formats:
546 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
547 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
548 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.content, odata)
549 3a19e99b Sofia Papagiannaki
550 3a19e99b Sofia Papagiannaki
    def test_if_unmodified_since_precondition_failed(self):
551 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
552 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
553 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
554 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
555 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
556 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
557 3a19e99b Sofia Papagiannaki
        t = t - datetime.timedelta(seconds=1)
558 3a19e99b Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
559 3a19e99b Sofia Papagiannaki
560 3a19e99b Sofia Papagiannaki
        for tf in t_formats:
561 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
562 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
563 3a19e99b Sofia Papagiannaki
564 3a19e99b Sofia Papagiannaki
    def test_hashes(self):
565 3a19e99b Sofia Papagiannaki
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
566 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
567 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=l)[:-1]
568 3a19e99b Sofia Papagiannaki
        size = len(odata)
569 3a19e99b Sofia Papagiannaki
570 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
571 3a19e99b Sofia Papagiannaki
        r = self.get('%s?format=json&hashmap' % url)
572 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
573 3a19e99b Sofia Papagiannaki
        body = json.loads(r.content)
574 3a19e99b Sofia Papagiannaki
575 3a19e99b Sofia Papagiannaki
        hashes = body['hashes']
576 3a19e99b Sofia Papagiannaki
        block_size = body['block_size']
577 3a19e99b Sofia Papagiannaki
        block_num = size / block_size if size / block_size == 0 else\
578 3a19e99b Sofia Papagiannaki
            size / block_size + 1
579 3a19e99b Sofia Papagiannaki
        self.assertTrue(len(hashes), block_num)
580 3a19e99b Sofia Papagiannaki
        i = 0
581 3a19e99b Sofia Papagiannaki
        for h in hashes:
582 3a19e99b Sofia Papagiannaki
            start = i * block_size
583 3a19e99b Sofia Papagiannaki
            end = (i + 1) * block_size
584 5fe43b8c Sofia Papagiannaki
            hash = merkle(odata[start:end])
585 3a19e99b Sofia Papagiannaki
            self.assertEqual(h, hash)
586 3a19e99b Sofia Papagiannaki
            i += 1
587 3a19e99b Sofia Papagiannaki
588 3a19e99b Sofia Papagiannaki
589 3a19e99b Sofia Papagiannaki
class ObjectPut(PithosAPITest):
590 3a19e99b Sofia Papagiannaki
    def setUp(self):
591 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
592 bc4f25c0 Sofia Papagiannaki
        self.container = get_random_name()
593 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
594 3a19e99b Sofia Papagiannaki
595 3a19e99b Sofia Papagiannaki
    def test_upload(self):
596 3a19e99b Sofia Papagiannaki
        cname = self.container
597 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
598 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
599 3a19e99b Sofia Papagiannaki
        meta = {'test': 'test1'}
600 3a19e99b Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
601 3a19e99b Sofia Papagiannaki
                       for k, v in meta.iteritems())
602 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
603 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data, content_type='application/pdf', **headers)
604 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
605 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
606 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
607 3a19e99b Sofia Papagiannaki
608 3a19e99b Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
609 3a19e99b Sofia Papagiannaki
610 3a19e99b Sofia Papagiannaki
        # assert object meta
611 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in info)
612 3a19e99b Sofia Papagiannaki
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
613 3a19e99b Sofia Papagiannaki
614 3a19e99b Sofia Papagiannaki
        # assert content-type
615 3a19e99b Sofia Papagiannaki
        self.assertEqual(info['content-type'], 'application/pdf')
616 3a19e99b Sofia Papagiannaki
617 3a19e99b Sofia Papagiannaki
        # assert uploaded content
618 3a19e99b Sofia Papagiannaki
        r = self.get(url)
619 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
620 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
621 3a19e99b Sofia Papagiannaki
622 3a19e99b Sofia Papagiannaki
    def test_maximum_upload_size_exceeds(self):
623 3a19e99b Sofia Papagiannaki
        cname = self.container
624 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
625 3a19e99b Sofia Papagiannaki
626 3a19e99b Sofia Papagiannaki
        # set container quota to 100
627 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname)
628 3a19e99b Sofia Papagiannaki
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
629 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
630 3a19e99b Sofia Papagiannaki
631 3a19e99b Sofia Papagiannaki
        info = self.get_container_info(cname)
632 3a19e99b Sofia Papagiannaki
        length = int(info['X-Container-Policy-Quota']) + 1
633 3a19e99b Sofia Papagiannaki
634 3a19e99b Sofia Papagiannaki
        data = get_random_data(length)
635 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
636 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data)
637 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 413)
638 3a19e99b Sofia Papagiannaki
639 3a19e99b Sofia Papagiannaki
    def test_upload_with_name_containing_slash(self):
640 3a19e99b Sofia Papagiannaki
        cname = self.container
641 bc4f25c0 Sofia Papagiannaki
        oname = '/%s' % get_random_name()
642 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
643 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
644 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data)
645 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
646 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
647 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
648 3a19e99b Sofia Papagiannaki
649 3a19e99b Sofia Papagiannaki
        r = self.get(url)
650 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
651 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
652 3a19e99b Sofia Papagiannaki
653 3a19e99b Sofia Papagiannaki
    def test_upload_unprocessable_entity(self):
654 3a19e99b Sofia Papagiannaki
        cname = self.container
655 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
656 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
657 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
658 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data, HTTP_ETAG='123')
659 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 422)
660 3a19e99b Sofia Papagiannaki
661 3a19e99b Sofia Papagiannaki
#    def test_chunked_transfer(self):
662 3a19e99b Sofia Papagiannaki
#        cname = self.container
663 bc4f25c0 Sofia Papagiannaki
#        oname = '/%s' % get_random_name()
664 5fe43b8c Sofia Papagiannaki
#        data = get_random_data()
665 3a19e99b Sofia Papagiannaki
#        url = join_urls(self.pithos_path, self.user, cname, oname)
666 3a19e99b Sofia Papagiannaki
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
667 3a19e99b Sofia Papagiannaki
#        self.assertEqual(r.status_code, 201)
668 3a19e99b Sofia Papagiannaki
#        self.assertTrue('ETag' in r)
669 3a19e99b Sofia Papagiannaki
#        self.assertTrue('X-Object-Version' in r)
670 3a19e99b Sofia Papagiannaki
671 3a19e99b Sofia Papagiannaki
    def test_manifestation(self):
672 3a19e99b Sofia Papagiannaki
        cname = self.container
673 3a19e99b Sofia Papagiannaki
        prefix = 'myobject/'
674 3a19e99b Sofia Papagiannaki
        data = ''
675 3a19e99b Sofia Papagiannaki
        for i in range(random.randint(2, 10)):
676 3a19e99b Sofia Papagiannaki
            part = '%s%d' % (prefix, i)
677 3a19e99b Sofia Papagiannaki
            data += self.upload_object(cname, oname=part)[1]
678 3a19e99b Sofia Papagiannaki
679 3a19e99b Sofia Papagiannaki
        manifest = '%s/%s' % (cname, prefix)
680 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
681 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
682 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
683 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
684 3a19e99b Sofia Papagiannaki
685 3a19e99b Sofia Papagiannaki
        # assert object exists
686 3a19e99b Sofia Papagiannaki
        r = self.get(url)
687 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
688 3a19e99b Sofia Papagiannaki
689 3a19e99b Sofia Papagiannaki
        # assert its content
690 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
691 3a19e99b Sofia Papagiannaki
692 3a19e99b Sofia Papagiannaki
        # invalid manifestation
693 bc4f25c0 Sofia Papagiannaki
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
694 3a19e99b Sofia Papagiannaki
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
695 3a19e99b Sofia Papagiannaki
        r = self.get(url)
696 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, '')
697 3a19e99b Sofia Papagiannaki
698 3a19e99b Sofia Papagiannaki
    def test_create_zero_length_object(self):
699 3a19e99b Sofia Papagiannaki
        cname = self.container
700 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
701 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
702 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='')
703 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
704 3a19e99b Sofia Papagiannaki
705 3a19e99b Sofia Papagiannaki
        r = self.get(url)
706 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
707 3a19e99b Sofia Papagiannaki
        self.assertEqual(int(r['Content-Length']), 0)
708 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, '')
709 3a19e99b Sofia Papagiannaki
710 3a19e99b Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
711 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
712 3a19e99b Sofia Papagiannaki
        body = json.loads(r.content)
713 3a19e99b Sofia Papagiannaki
        hashes = body['hashes']
714 5fe43b8c Sofia Papagiannaki
        hash = merkle('')
715 3a19e99b Sofia Papagiannaki
        self.assertEqual(hashes, [hash])
716 3a19e99b Sofia Papagiannaki
717 3a19e99b Sofia Papagiannaki
    def test_create_object_by_hashmap(self):
718 3a19e99b Sofia Papagiannaki
        cname = self.container
719 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
720 3a19e99b Sofia Papagiannaki
721 3a19e99b Sofia Papagiannaki
        # upload an object
722 3a19e99b Sofia Papagiannaki
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
723 3a19e99b Sofia Papagiannaki
        # get it hashmap
724 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
725 3a19e99b Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
726 3a19e99b Sofia Papagiannaki
727 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
728 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
729 3a19e99b Sofia Papagiannaki
        r = self.put('%s?hashmap=' % url, data=r.content)
730 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
731 3a19e99b Sofia Papagiannaki
732 3a19e99b Sofia Papagiannaki
        r = self.get(url)
733 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
734 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
735 3a19e99b Sofia Papagiannaki
736 3a5994a8 Sofia Papagiannaki
    def test_create_object_by_invalid_hashmap(self):
737 3a5994a8 Sofia Papagiannaki
        cname = self.container
738 3a5994a8 Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
739 3a5994a8 Sofia Papagiannaki
740 3a5994a8 Sofia Papagiannaki
        # upload an object
741 3a5994a8 Sofia Papagiannaki
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
742 3a5994a8 Sofia Papagiannaki
        # get it hashmap
743 3a5994a8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
744 3a5994a8 Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
745 3a5994a8 Sofia Papagiannaki
        data = r.content
746 3a5994a8 Sofia Papagiannaki
        try:
747 3a5994a8 Sofia Papagiannaki
            hashmap = json.loads(data)
748 3a5994a8 Sofia Papagiannaki
        except:
749 3a5994a8 Sofia Papagiannaki
            self.fail('JSON format expected')
750 3a5994a8 Sofia Papagiannaki
751 3a5994a8 Sofia Papagiannaki
        oname = get_random_name()
752 3a5994a8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
753 3a5994a8 Sofia Papagiannaki
        hashmap['hashes'] = [get_random_name()]
754 3a5994a8 Sofia Papagiannaki
        r = self.put('%s?hashmap=' % url, data=json.dumps(hashmap))
755 3a5994a8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
756 3a5994a8 Sofia Papagiannaki
757 3a19e99b Sofia Papagiannaki
758 f53483d8 Sofia Papagiannaki
class ObjectPutCopy(PithosAPITest):
759 3a19e99b Sofia Papagiannaki
    def setUp(self):
760 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
761 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
762 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
763 3a19e99b Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
764 3a19e99b Sofia Papagiannaki
765 3a19e99b Sofia Papagiannaki
        url = join_urls(
766 3a19e99b Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
767 3a19e99b Sofia Papagiannaki
        r = self.head(url)
768 3a19e99b Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
769 3a19e99b Sofia Papagiannaki
770 3a19e99b Sofia Papagiannaki
    def test_copy(self):
771 3a19e99b Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
772 3a19e99b Sofia Papagiannaki
                                    self.object):
773 3a19e99b Sofia Papagiannaki
            # copy object
774 bc4f25c0 Sofia Papagiannaki
            oname = get_random_name()
775 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, oname)
776 3a19e99b Sofia Papagiannaki
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
777 3a19e99b Sofia Papagiannaki
                         HTTP_X_COPY_FROM='/%s/%s' % (
778 3a19e99b Sofia Papagiannaki
                             self.container, self.object))
779 3a19e99b Sofia Papagiannaki
780 3a19e99b Sofia Papagiannaki
            # assert copy success
781 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
782 3a19e99b Sofia Papagiannaki
783 3a19e99b Sofia Papagiannaki
            # assert access the new object
784 3a19e99b Sofia Papagiannaki
            r = self.head(url)
785 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
786 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
787 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
788 3a19e99b Sofia Papagiannaki
789 3a19e99b Sofia Papagiannaki
            # assert etag is the same
790 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
791 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
792 3a19e99b Sofia Papagiannaki
793 3a19e99b Sofia Papagiannaki
    def test_copy_from_different_container(self):
794 3a19e99b Sofia Papagiannaki
        cname = 'c2'
795 3a19e99b Sofia Papagiannaki
        self.create_container(cname)
796 3a19e99b Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
797 3a19e99b Sofia Papagiannaki
                                    self.object):
798 bc4f25c0 Sofia Papagiannaki
            oname = get_random_name()
799 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, cname, oname)
800 3a19e99b Sofia Papagiannaki
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
801 3a19e99b Sofia Papagiannaki
                         HTTP_X_COPY_FROM='/%s/%s' % (
802 3a19e99b Sofia Papagiannaki
                             self.container, self.object))
803 3a19e99b Sofia Papagiannaki
804 3a19e99b Sofia Papagiannaki
            # assert copy success
805 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
806 3a19e99b Sofia Papagiannaki
807 3a19e99b Sofia Papagiannaki
            # assert access the new object
808 3a19e99b Sofia Papagiannaki
            r = self.head(url)
809 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
810 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
811 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
812 3a19e99b Sofia Papagiannaki
813 3a19e99b Sofia Papagiannaki
            # assert etag is the same
814 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
815 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
816 3a19e99b Sofia Papagiannaki
817 f53483d8 Sofia Papagiannaki
    def test_copy_from_other_account(self):
818 f53483d8 Sofia Papagiannaki
        cname = 'c2'
819 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='chuck')
820 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='alice')
821 f53483d8 Sofia Papagiannaki
822 f53483d8 Sofia Papagiannaki
        # share object for read with alice
823 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
824 f53483d8 Sofia Papagiannaki
                        self.object)
825 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
826 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=alice')
827 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
828 f53483d8 Sofia Papagiannaki
829 f53483d8 Sofia Papagiannaki
        # assert not allowed for chuck
830 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
831 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
832 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='chuck',
833 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
834 f53483d8 Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
835 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
836 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
837 f53483d8 Sofia Papagiannaki
838 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
839 f53483d8 Sofia Papagiannaki
840 f53483d8 Sofia Papagiannaki
        # assert copy success for alice
841 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, oname)
842 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='alice',
843 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
844 f53483d8 Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
845 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
846 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
847 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
848 f53483d8 Sofia Papagiannaki
849 f53483d8 Sofia Papagiannaki
        # assert access the new object
850 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='alice')
851 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
852 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
853 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
854 f53483d8 Sofia Papagiannaki
855 f53483d8 Sofia Papagiannaki
        # assert etag is the same
856 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
857 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
858 f53483d8 Sofia Papagiannaki
859 f53483d8 Sofia Papagiannaki
        # share object for write
860 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
861 f53483d8 Sofia Papagiannaki
                        self.object)
862 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
863 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=dan')
864 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
865 f53483d8 Sofia Papagiannaki
866 f53483d8 Sofia Papagiannaki
        # assert not allowed copy for alice
867 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, oname)
868 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='alice',
869 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
870 f53483d8 Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
871 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
872 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
873 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
874 f53483d8 Sofia Papagiannaki
875 f53483d8 Sofia Papagiannaki
        # assert allowed copy for dan
876 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='dan')
877 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'dan', cname, oname)
878 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='dan',
879 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
880 f53483d8 Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
881 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
882 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
883 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
884 f53483d8 Sofia Papagiannaki
885 f53483d8 Sofia Papagiannaki
        # assert access the new object
886 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='dan')
887 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
888 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
889 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
890 f53483d8 Sofia Papagiannaki
891 f53483d8 Sofia Papagiannaki
        # assert etag is the same
892 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
893 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
894 f53483d8 Sofia Papagiannaki
895 f53483d8 Sofia Papagiannaki
        # assert source object still exists
896 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
897 f53483d8 Sofia Papagiannaki
                        self.object)
898 f53483d8 Sofia Papagiannaki
        r = self.head(url)
899 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
900 f53483d8 Sofia Papagiannaki
        # assert etag is the same
901 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
902 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
903 f53483d8 Sofia Papagiannaki
904 f53483d8 Sofia Papagiannaki
        r = self.get(url)
905 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
906 f53483d8 Sofia Papagiannaki
        # assert etag is the same
907 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
908 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
909 f53483d8 Sofia Papagiannaki
910 3a19e99b Sofia Papagiannaki
    def test_copy_invalid(self):
911 3a19e99b Sofia Papagiannaki
        # copy from non-existent object
912 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
913 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
914 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
915 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
916 bc4f25c0 Sofia Papagiannaki
                         self.container, get_random_name()))
917 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
918 3a19e99b Sofia Papagiannaki
919 3a19e99b Sofia Papagiannaki
        # copy from non-existent container
920 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
921 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
922 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
923 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
924 bc4f25c0 Sofia Papagiannaki
                         get_random_name(), self.object))
925 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
926 3a19e99b Sofia Papagiannaki
927 3a19e99b Sofia Papagiannaki
    def test_copy_dir(self):
928 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
929 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
930 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
931 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
932 3a19e99b Sofia Papagiannaki
        append = objects.append
933 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
934 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_name()),
935 6ee6677e Sofia Papagiannaki
                                  depth='1')[0])
936 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
937 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_name()),
938 6ee6677e Sofia Papagiannaki
                                  depth='2')[0])
939 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
940 3a19e99b Sofia Papagiannaki
941 3a19e99b Sofia Papagiannaki
        # copy dir
942 3a19e99b Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
943 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
944 3a19e99b Sofia Papagiannaki
                        copy_folder)
945 3a19e99b Sofia Papagiannaki
        r = self.put('%s?delimiter=/' % url, data='',
946 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
947 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
948 3a19e99b Sofia Papagiannaki
949 3a19e99b Sofia Papagiannaki
        for obj in objects:
950 3a19e99b Sofia Papagiannaki
            # assert object exists
951 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
952 3a19e99b Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
953 3a19e99b Sofia Papagiannaki
            r = self.head(url)
954 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
955 3a19e99b Sofia Papagiannaki
956 3a19e99b Sofia Papagiannaki
            # assert metadata
957 3a19e99b Sofia Papagiannaki
            meta = self.get_object_meta(self.container, obj)
958 3a19e99b Sofia Papagiannaki
            for k in meta.keys():
959 6ee6677e Sofia Papagiannaki
                key = 'X-Object-Meta-%s' % k
960 6ee6677e Sofia Papagiannaki
                self.assertTrue(key in r)
961 6ee6677e Sofia Papagiannaki
                self.assertEqual(r[key], meta[k])
962 3a19e99b Sofia Papagiannaki
963 3a19e99b Sofia Papagiannaki
        # assert other has not been created under copy folder
964 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
965 3a19e99b Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
966 3a19e99b Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
967 3a19e99b Sofia Papagiannaki
        r = self.head(url)
968 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
969 3a19e99b Sofia Papagiannaki
970 3a19e99b Sofia Papagiannaki
971 f53483d8 Sofia Papagiannaki
class ObjectPutMove(PithosAPITest):
972 3a19e99b Sofia Papagiannaki
    def setUp(self):
973 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
974 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
975 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
976 3a19e99b Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
977 3a19e99b Sofia Papagiannaki
978 3a19e99b Sofia Papagiannaki
        url = join_urls(
979 3a19e99b Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
980 3a19e99b Sofia Papagiannaki
        r = self.head(url)
981 3a19e99b Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
982 3a19e99b Sofia Papagiannaki
983 3a19e99b Sofia Papagiannaki
    def test_move(self):
984 3a19e99b Sofia Papagiannaki
        # move object
985 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
986 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
987 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
988 3a19e99b Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
989 3a19e99b Sofia Papagiannaki
                         self.container, self.object))
990 3a19e99b Sofia Papagiannaki
991 3a19e99b Sofia Papagiannaki
        # assert move success
992 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
993 3a19e99b Sofia Papagiannaki
994 3a19e99b Sofia Papagiannaki
        # assert access the new object
995 3a19e99b Sofia Papagiannaki
        r = self.head(url)
996 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
997 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
998 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
999 3a19e99b Sofia Papagiannaki
1000 3a19e99b Sofia Papagiannaki
        # assert etag is the same
1001 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1002 3a19e99b Sofia Papagiannaki
1003 3a19e99b Sofia Papagiannaki
        # assert the initial object has been deleted
1004 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1005 3a19e99b Sofia Papagiannaki
                        self.object)
1006 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1007 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1008 3a19e99b Sofia Papagiannaki
1009 3a19e99b Sofia Papagiannaki
    def test_move_dir(self):
1010 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
1011 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
1012 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1013 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
1014 3a19e99b Sofia Papagiannaki
        append = objects.append
1015 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
1016 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_name()),
1017 6ee6677e Sofia Papagiannaki
                                  depth='1')[0])
1018 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
1019 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_name()),
1020 6ee6677e Sofia Papagiannaki
                                  depth='1')[0])
1021 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
1022 3a19e99b Sofia Papagiannaki
1023 3a19e99b Sofia Papagiannaki
        # move dir
1024 3a19e99b Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
1025 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1026 3a19e99b Sofia Papagiannaki
                        copy_folder)
1027 3a19e99b Sofia Papagiannaki
        r = self.put('%s?delimiter=/' % url, data='',
1028 3a19e99b Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
1029 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1030 3a19e99b Sofia Papagiannaki
1031 3a19e99b Sofia Papagiannaki
        for obj in objects:
1032 3a19e99b Sofia Papagiannaki
            # assert initial object does not exist
1033 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1034 3a19e99b Sofia Papagiannaki
            r = self.head(url)
1035 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
1036 3a19e99b Sofia Papagiannaki
1037 3a19e99b Sofia Papagiannaki
            # assert new object was created
1038 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1039 3a19e99b Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
1040 3a19e99b Sofia Papagiannaki
            r = self.head(url)
1041 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1042 3a19e99b Sofia Papagiannaki
1043 3a19e99b Sofia Papagiannaki
        # assert other has not been created under copy folder
1044 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1045 3a19e99b Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
1046 3a19e99b Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
1047 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1048 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1049 3a19e99b Sofia Papagiannaki
1050 f53483d8 Sofia Papagiannaki
    def test_move_from_other_account(self):
1051 f53483d8 Sofia Papagiannaki
        cname = 'c2'
1052 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='chuck')
1053 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='alice')
1054 f53483d8 Sofia Papagiannaki
1055 f53483d8 Sofia Papagiannaki
        # share object for read with alice
1056 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1057 f53483d8 Sofia Papagiannaki
                        self.object)
1058 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1059 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=alice')
1060 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1061 f53483d8 Sofia Papagiannaki
1062 f53483d8 Sofia Papagiannaki
        # assert not allowed move for chuck
1063 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
1064 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
1065 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='chuck',
1066 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
1067 f53483d8 Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1068 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
1069 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
1070 f53483d8 Sofia Papagiannaki
1071 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1072 f53483d8 Sofia Papagiannaki
1073 f53483d8 Sofia Papagiannaki
        # assert no new object was created
1074 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='chuck')
1075 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1076 f53483d8 Sofia Papagiannaki
1077 f53483d8 Sofia Papagiannaki
        # assert not allowed move for alice
1078 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1079 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='alice',
1080 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
1081 f53483d8 Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1082 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
1083 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
1084 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1085 f53483d8 Sofia Papagiannaki
1086 f53483d8 Sofia Papagiannaki
        # assert no new object was created
1087 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='alice')
1088 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1089 f53483d8 Sofia Papagiannaki
1090 f53483d8 Sofia Papagiannaki
        # share object for write with dan
1091 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1092 f53483d8 Sofia Papagiannaki
                        self.object)
1093 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1094 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=dan')
1095 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1096 f53483d8 Sofia Papagiannaki
1097 f53483d8 Sofia Papagiannaki
        # assert not allowed move for alice
1098 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1099 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='alice',
1100 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
1101 f53483d8 Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1102 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
1103 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
1104 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1105 f53483d8 Sofia Papagiannaki
1106 f53483d8 Sofia Papagiannaki
        # assert no new object was created
1107 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='alice')
1108 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1109 f53483d8 Sofia Papagiannaki
1110 f53483d8 Sofia Papagiannaki
        # assert not allowed move for dan
1111 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='dan')
1112 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'dan', cname, oname)
1113 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='dan',
1114 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
1115 f53483d8 Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1116 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
1117 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
1118 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1119 f53483d8 Sofia Papagiannaki
1120 f53483d8 Sofia Papagiannaki
        # assert no new object was created
1121 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='dan')
1122 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1123 f53483d8 Sofia Papagiannaki
1124 f53483d8 Sofia Papagiannaki
1125 f53483d8 Sofia Papagiannaki
class ObjectCopy(PithosAPITest):
1126 f53483d8 Sofia Papagiannaki
    def setUp(self):
1127 f53483d8 Sofia Papagiannaki
        PithosAPITest.setUp(self)
1128 f53483d8 Sofia Papagiannaki
        self.container = 'c1'
1129 f53483d8 Sofia Papagiannaki
        self.create_container(self.container)
1130 f53483d8 Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
1131 f53483d8 Sofia Papagiannaki
1132 f53483d8 Sofia Papagiannaki
        url = join_urls(
1133 f53483d8 Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
1134 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1135 f53483d8 Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
1136 f53483d8 Sofia Papagiannaki
1137 f53483d8 Sofia Papagiannaki
    def test_copy(self):
1138 f53483d8 Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
1139 f53483d8 Sofia Papagiannaki
                                    self.object):
1140 f53483d8 Sofia Papagiannaki
            oname = get_random_name()
1141 f53483d8 Sofia Papagiannaki
            # copy object
1142 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1143 f53483d8 Sofia Papagiannaki
                            self.object)
1144 f53483d8 Sofia Papagiannaki
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1145 f53483d8 Sofia Papagiannaki
                          HTTP_DESTINATION='/%s/%s' % (self.container,
1146 f53483d8 Sofia Papagiannaki
                                                       oname))
1147 f53483d8 Sofia Papagiannaki
            # assert copy success
1148 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1149 f53483d8 Sofia Papagiannaki
                            oname)
1150 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
1151 f53483d8 Sofia Papagiannaki
1152 f53483d8 Sofia Papagiannaki
            # assert access the new object
1153 f53483d8 Sofia Papagiannaki
            r = self.head(url)
1154 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1155 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
1156 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1157 f53483d8 Sofia Papagiannaki
1158 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1159 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1160 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1161 f53483d8 Sofia Papagiannaki
1162 f53483d8 Sofia Papagiannaki
            # assert source object still exists
1163 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1164 f53483d8 Sofia Papagiannaki
                            self.object)
1165 f53483d8 Sofia Papagiannaki
            r = self.head(url)
1166 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1167 f53483d8 Sofia Papagiannaki
1168 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1169 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1170 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1171 f53483d8 Sofia Papagiannaki
1172 f53483d8 Sofia Papagiannaki
            r = self.get(url)
1173 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1174 f53483d8 Sofia Papagiannaki
1175 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1176 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1177 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1178 f53483d8 Sofia Papagiannaki
1179 f53483d8 Sofia Papagiannaki
            # copy object to other container (not existing)
1180 f53483d8 Sofia Papagiannaki
            cname = get_random_name()
1181 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1182 f53483d8 Sofia Papagiannaki
                            self.object)
1183 f53483d8 Sofia Papagiannaki
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1184 f53483d8 Sofia Papagiannaki
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1185 f53483d8 Sofia Papagiannaki
1186 f53483d8 Sofia Papagiannaki
            # assert destination container does not exist
1187 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, cname,
1188 f53483d8 Sofia Papagiannaki
                            self.object)
1189 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
1190 f53483d8 Sofia Papagiannaki
1191 f53483d8 Sofia Papagiannaki
            # create container
1192 f53483d8 Sofia Papagiannaki
            self.create_container(cname)
1193 f53483d8 Sofia Papagiannaki
1194 f53483d8 Sofia Papagiannaki
            # copy object to other container (existing)
1195 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1196 f53483d8 Sofia Papagiannaki
                            self.object)
1197 f53483d8 Sofia Papagiannaki
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1198 f53483d8 Sofia Papagiannaki
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1199 f53483d8 Sofia Papagiannaki
1200 f53483d8 Sofia Papagiannaki
            # assert copy success
1201 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, cname,
1202 f53483d8 Sofia Papagiannaki
                            self.object)
1203 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
1204 f53483d8 Sofia Papagiannaki
1205 f53483d8 Sofia Papagiannaki
            # assert access the new object
1206 f53483d8 Sofia Papagiannaki
            r = self.head(url)
1207 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1208 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
1209 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1210 f53483d8 Sofia Papagiannaki
1211 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1212 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1213 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1214 f53483d8 Sofia Papagiannaki
1215 f53483d8 Sofia Papagiannaki
                        # assert source object still exists
1216 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1217 f53483d8 Sofia Papagiannaki
                            self.object)
1218 f53483d8 Sofia Papagiannaki
            r = self.head(url)
1219 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1220 f53483d8 Sofia Papagiannaki
1221 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1222 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1223 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1224 f53483d8 Sofia Papagiannaki
1225 f53483d8 Sofia Papagiannaki
            r = self.get(url)
1226 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1227 f53483d8 Sofia Papagiannaki
1228 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1229 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1230 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1231 f53483d8 Sofia Papagiannaki
1232 f53483d8 Sofia Papagiannaki
    def test_copy_to_other_account(self):
1233 f53483d8 Sofia Papagiannaki
        # create a container under alice account
1234 f53483d8 Sofia Papagiannaki
        cname = self.create_container(user='alice')[0]
1235 f53483d8 Sofia Papagiannaki
1236 f53483d8 Sofia Papagiannaki
        # create a folder under this container
1237 f53483d8 Sofia Papagiannaki
        folder = self.create_folder(cname, user='alice')[0]
1238 f53483d8 Sofia Papagiannaki
1239 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
1240 f53483d8 Sofia Papagiannaki
1241 f53483d8 Sofia Papagiannaki
        # copy object to other account container
1242 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1243 f53483d8 Sofia Papagiannaki
                        self.object)
1244 f53483d8 Sofia Papagiannaki
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1245 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1246 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1247 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1248 f53483d8 Sofia Papagiannaki
1249 f53483d8 Sofia Papagiannaki
        # share object for read with user
1250 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1251 f53483d8 Sofia Papagiannaki
        r = self.post(url, user='alice', content_type='',
1252 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1253 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1254 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1255 f53483d8 Sofia Papagiannaki
1256 f53483d8 Sofia Papagiannaki
        # assert copy object still is not allowed
1257 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1258 f53483d8 Sofia Papagiannaki
                        self.object)
1259 f53483d8 Sofia Papagiannaki
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1260 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1261 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1262 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1263 f53483d8 Sofia Papagiannaki
1264 f53483d8 Sofia Papagiannaki
        # share object for write with user
1265 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1266 f53483d8 Sofia Papagiannaki
        r = self.post(url, user='alice',  content_type='',
1267 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1268 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1269 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1270 f53483d8 Sofia Papagiannaki
1271 f53483d8 Sofia Papagiannaki
        # assert copy object now is allowed
1272 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1273 f53483d8 Sofia Papagiannaki
                        self.object)
1274 f53483d8 Sofia Papagiannaki
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1275 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1276 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1277 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1278 f53483d8 Sofia Papagiannaki
1279 f53483d8 Sofia Papagiannaki
        # assert access the new object
1280 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder, oname)
1281 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='alice')
1282 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1283 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
1284 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1285 f53483d8 Sofia Papagiannaki
1286 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1287 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1288 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1289 f53483d8 Sofia Papagiannaki
1290 f53483d8 Sofia Papagiannaki
        # assert source object still exists
1291 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1292 f53483d8 Sofia Papagiannaki
                        self.object)
1293 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1294 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1295 f53483d8 Sofia Papagiannaki
1296 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1297 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1298 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1299 f53483d8 Sofia Papagiannaki
1300 f53483d8 Sofia Papagiannaki
        r = self.get(url)
1301 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1302 f53483d8 Sofia Papagiannaki
1303 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1304 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1305 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1306 f53483d8 Sofia Papagiannaki
1307 f53483d8 Sofia Papagiannaki
1308 f53483d8 Sofia Papagiannaki
class ObjectMove(PithosAPITest):
1309 f53483d8 Sofia Papagiannaki
    def setUp(self):
1310 f53483d8 Sofia Papagiannaki
        PithosAPITest.setUp(self)
1311 f53483d8 Sofia Papagiannaki
        self.container = 'c1'
1312 f53483d8 Sofia Papagiannaki
        self.create_container(self.container)
1313 f53483d8 Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
1314 f53483d8 Sofia Papagiannaki
1315 f53483d8 Sofia Papagiannaki
        url = join_urls(
1316 f53483d8 Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
1317 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1318 f53483d8 Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
1319 f53483d8 Sofia Papagiannaki
1320 f53483d8 Sofia Papagiannaki
    def test_move(self):
1321 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
1322 f53483d8 Sofia Papagiannaki
1323 f53483d8 Sofia Papagiannaki
        # move object
1324 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1325 f53483d8 Sofia Papagiannaki
                        self.object)
1326 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1327 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1328 f53483d8 Sofia Papagiannaki
                                                   oname))
1329 f53483d8 Sofia Papagiannaki
        # assert move success
1330 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1331 f53483d8 Sofia Papagiannaki
                        oname)
1332 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1333 f53483d8 Sofia Papagiannaki
1334 f53483d8 Sofia Papagiannaki
        # assert access the new object
1335 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1336 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1337 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
1338 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1339 f53483d8 Sofia Papagiannaki
1340 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1341 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1342 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1343 f53483d8 Sofia Papagiannaki
1344 f53483d8 Sofia Papagiannaki
        # assert source object does not exist
1345 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1346 f53483d8 Sofia Papagiannaki
                        self.object)
1347 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1348 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1349 f53483d8 Sofia Papagiannaki
1350 f53483d8 Sofia Papagiannaki
    def test_move_to_other_container(self):
1351 f53483d8 Sofia Papagiannaki
        # move object to other container (not existing)
1352 f53483d8 Sofia Papagiannaki
        cname = get_random_name()
1353 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1354 f53483d8 Sofia Papagiannaki
                        self.object)
1355 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1356 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1357 f53483d8 Sofia Papagiannaki
1358 f53483d8 Sofia Papagiannaki
        # assert destination container does not exist
1359 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname,
1360 f53483d8 Sofia Papagiannaki
                        self.object)
1361 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1362 f53483d8 Sofia Papagiannaki
1363 f53483d8 Sofia Papagiannaki
        # create container
1364 f53483d8 Sofia Papagiannaki
        self.create_container(cname)
1365 f53483d8 Sofia Papagiannaki
1366 f53483d8 Sofia Papagiannaki
        # move object to other container (existing)
1367 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1368 f53483d8 Sofia Papagiannaki
                        self.object)
1369 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1370 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1371 f53483d8 Sofia Papagiannaki
1372 f53483d8 Sofia Papagiannaki
        # assert move success
1373 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname,
1374 f53483d8 Sofia Papagiannaki
                        self.object)
1375 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1376 f53483d8 Sofia Papagiannaki
1377 f53483d8 Sofia Papagiannaki
        # assert access the new object
1378 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1379 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1380 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
1381 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1382 f53483d8 Sofia Papagiannaki
1383 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1384 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1385 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1386 f53483d8 Sofia Papagiannaki
1387 f53483d8 Sofia Papagiannaki
        # assert source object does not exist
1388 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1389 f53483d8 Sofia Papagiannaki
                        self.object)
1390 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1391 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1392 f53483d8 Sofia Papagiannaki
1393 f53483d8 Sofia Papagiannaki
    def test_move_to_other_account(self):
1394 f53483d8 Sofia Papagiannaki
        # create a container under alice account
1395 f53483d8 Sofia Papagiannaki
        cname = self.create_container(user='alice')[0]
1396 f53483d8 Sofia Papagiannaki
1397 f53483d8 Sofia Papagiannaki
        # create a folder under this container
1398 f53483d8 Sofia Papagiannaki
        folder = self.create_folder(cname, user='alice')[0]
1399 f53483d8 Sofia Papagiannaki
1400 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
1401 f53483d8 Sofia Papagiannaki
1402 f53483d8 Sofia Papagiannaki
        # move object to other account container
1403 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1404 f53483d8 Sofia Papagiannaki
                        self.object)
1405 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1406 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1407 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1408 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1409 f53483d8 Sofia Papagiannaki
1410 f53483d8 Sofia Papagiannaki
        # share object for read with user
1411 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1412 f53483d8 Sofia Papagiannaki
        r = self.post(url, user='alice', content_type='',
1413 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1414 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1415 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1416 f53483d8 Sofia Papagiannaki
1417 f53483d8 Sofia Papagiannaki
        # assert move object still is not allowed
1418 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1419 f53483d8 Sofia Papagiannaki
                        self.object)
1420 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1421 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1422 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1423 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1424 f53483d8 Sofia Papagiannaki
1425 f53483d8 Sofia Papagiannaki
        # share object for write with user
1426 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1427 f53483d8 Sofia Papagiannaki
        r = self.post(url, user='alice',  content_type='',
1428 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1429 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1430 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1431 f53483d8 Sofia Papagiannaki
1432 f53483d8 Sofia Papagiannaki
        # assert move object now is allowed
1433 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1434 f53483d8 Sofia Papagiannaki
                        self.object)
1435 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1436 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1437 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1438 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1439 f53483d8 Sofia Papagiannaki
1440 f53483d8 Sofia Papagiannaki
        # assert source object does not exist
1441 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1442 f53483d8 Sofia Papagiannaki
                        self.object)
1443 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1444 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1445 f53483d8 Sofia Papagiannaki
1446 3a19e99b Sofia Papagiannaki
1447 3a19e99b Sofia Papagiannaki
class ObjectPost(PithosAPITest):
1448 3a19e99b Sofia Papagiannaki
    def setUp(self):
1449 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
1450 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
1451 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
1452 3a19e99b Sofia Papagiannaki
        self.object, self.object_data = self.upload_object(self.container)[:2]
1453 3a19e99b Sofia Papagiannaki
1454 3a19e99b Sofia Papagiannaki
    def test_update_meta(self):
1455 3a19e99b Sofia Papagiannaki
        with AssertUUidInvariant(self.get_object_info,
1456 3a19e99b Sofia Papagiannaki
                                 self.container,
1457 3a19e99b Sofia Papagiannaki
                                 self.object):
1458 3a19e99b Sofia Papagiannaki
            # update metadata
1459 3a19e99b Sofia Papagiannaki
            d = {'a' * 114: 'b' * 256}
1460 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1461 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
1462 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1463 3a19e99b Sofia Papagiannaki
                            self.object)
1464 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
1465 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
1466 3a19e99b Sofia Papagiannaki
1467 3a19e99b Sofia Papagiannaki
            # assert metadata have been updated
1468 3a19e99b Sofia Papagiannaki
            meta = self.get_object_meta(self.container, self.object)
1469 3a19e99b Sofia Papagiannaki
1470 3a19e99b Sofia Papagiannaki
            for k, v in d.items():
1471 6ee6677e Sofia Papagiannaki
                self.assertTrue(k.title() in meta)
1472 6ee6677e Sofia Papagiannaki
                self.assertTrue(meta[k.title()], v)
1473 3a19e99b Sofia Papagiannaki
1474 3a19e99b Sofia Papagiannaki
            # Header key too large
1475 3a19e99b Sofia Papagiannaki
            d = {'a' * 115: 'b' * 256}
1476 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1477 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
1478 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
1479 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 400)
1480 3a19e99b Sofia Papagiannaki
1481 3a19e99b Sofia Papagiannaki
            # Header value too large
1482 3a19e99b Sofia Papagiannaki
            d = {'a' * 114: 'b' * 257}
1483 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1484 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
1485 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
1486 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 400)
1487 3a19e99b Sofia Papagiannaki
1488 3a19e99b Sofia Papagiannaki
#            # Check utf-8 meta
1489 3a19e99b Sofia Papagiannaki
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
1490 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1491 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
1492 3a19e99b Sofia Papagiannaki
#            url = join_urls(self.pithos_path, self.user, self.container,
1493 3a19e99b Sofia Papagiannaki
#                            self.object)
1494 3a19e99b Sofia Papagiannaki
#            r = self.post(url, content_type='', **kwargs)
1495 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 202)
1496 3a19e99b Sofia Papagiannaki
#
1497 3a19e99b Sofia Papagiannaki
#            # assert metadata have been updated
1498 3a19e99b Sofia Papagiannaki
#            meta = self.get_object_meta(self.container, self.object)
1499 3a19e99b Sofia Papagiannaki
#
1500 3a19e99b Sofia Papagiannaki
#            for k, v in d.items():
1501 3a19e99b Sofia Papagiannaki
#                key = 'X-Object-Meta-%s' % k.title()
1502 3a19e99b Sofia Papagiannaki
#                self.assertTrue(key in meta)
1503 3a19e99b Sofia Papagiannaki
#                self.assertTrue(meta[key], v)
1504 3a19e99b Sofia Papagiannaki
#
1505 3a19e99b Sofia Papagiannaki
#            # Header key too large
1506 3a19e99b Sofia Papagiannaki
#            d = {'α' * 114: 'β' * (256 / 2)}
1507 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1508 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
1509 3a19e99b Sofia Papagiannaki
#            r = self.post(url, content_type='', **kwargs)
1510 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 400)
1511 3a19e99b Sofia Papagiannaki
#
1512 3a19e99b Sofia Papagiannaki
#            # Header value too large
1513 3a19e99b Sofia Papagiannaki
#            d = {'α' * 114: 'β' * 256}
1514 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1515 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
1516 3a19e99b Sofia Papagiannaki
#            r = self.udpate(url, content_type='', **kwargs)
1517 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 400)
1518 3a19e99b Sofia Papagiannaki
1519 3a19e99b Sofia Papagiannaki
    def test_update_object(self):
1520 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1521 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1522 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(
1523 42c9ed6e Sofia Papagiannaki
                block_size + 2, 2 * block_size))[:2]
1524 3a19e99b Sofia Papagiannaki
1525 3a19e99b Sofia Papagiannaki
        length = len(odata)
1526 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1527 3a19e99b Sofia Papagiannaki
        last_byte_pos = random.randint(block_size + 1, length - 1)
1528 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1529 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1530 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1531 3a19e99b Sofia Papagiannaki
1532 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1533 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1534 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1535 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1536 3a19e99b Sofia Papagiannaki
1537 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1538 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
1539 3a19e99b Sofia Papagiannaki
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1540 3a19e99b Sofia Papagiannaki
                                     data)
1541 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
1542 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(updated_data)
1543 3a19e99b Sofia Papagiannaki
        else:
1544 3a19e99b Sofia Papagiannaki
            etag = merkle(updated_data)
1545 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r['ETag'], etag)
1546 3a19e99b Sofia Papagiannaki
1547 3a19e99b Sofia Papagiannaki
        # check modified object
1548 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1549 3a19e99b Sofia Papagiannaki
1550 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1551 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, updated_data)
1552 3a19e99b Sofia Papagiannaki
        self.assertEqual(etag, r['ETag'])
1553 3a19e99b Sofia Papagiannaki
1554 3a19e99b Sofia Papagiannaki
    def test_update_object_divided_by_blocksize(self):
1555 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1556 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(self.container,
1557 3a19e99b Sofia Papagiannaki
                                          length=2 * block_size)[:2]
1558 3a19e99b Sofia Papagiannaki
1559 3a19e99b Sofia Papagiannaki
        length = len(odata)
1560 3a19e99b Sofia Papagiannaki
        first_byte_pos = block_size
1561 3a19e99b Sofia Papagiannaki
        last_byte_pos = 2 * block_size - 1
1562 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1563 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1564 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1565 3a19e99b Sofia Papagiannaki
1566 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1567 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1568 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1569 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1570 3a19e99b Sofia Papagiannaki
1571 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1572 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
1573 3a19e99b Sofia Papagiannaki
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1574 3a19e99b Sofia Papagiannaki
                                     data)
1575 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
1576 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(updated_data)
1577 3a19e99b Sofia Papagiannaki
        else:
1578 3a19e99b Sofia Papagiannaki
            etag = merkle(updated_data)
1579 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r['ETag'], etag)
1580 3a19e99b Sofia Papagiannaki
1581 3a19e99b Sofia Papagiannaki
        # check modified object
1582 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1583 3a19e99b Sofia Papagiannaki
1584 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1585 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, updated_data)
1586 3a19e99b Sofia Papagiannaki
        self.assertEqual(etag, r['ETag'])
1587 3a19e99b Sofia Papagiannaki
1588 3a19e99b Sofia Papagiannaki
    def test_update_object_invalid_content_length(self):
1589 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1590 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1591 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(
1592 3a19e99b Sofia Papagiannaki
                block_size + 1, 2 * block_size))[:2]
1593 3a19e99b Sofia Papagiannaki
1594 3a19e99b Sofia Papagiannaki
        length = len(odata)
1595 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1596 3a19e99b Sofia Papagiannaki
        last_byte_pos = random.randint(block_size + 1, length - 1)
1597 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1598 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1599 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1600 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1601 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range,
1602 6ee6677e Sofia Papagiannaki
                  'CONTENT_LENGTH': str(partial + 1)}
1603 3a19e99b Sofia Papagiannaki
1604 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1605 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1606 3a19e99b Sofia Papagiannaki
1607 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1608 3a19e99b Sofia Papagiannaki
1609 3a19e99b Sofia Papagiannaki
    def test_update_object_invalid_range(self):
1610 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1611 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1612 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(block_size + 1,
1613 3a19e99b Sofia Papagiannaki
                                                  2 * block_size))[:2]
1614 3a19e99b Sofia Papagiannaki
1615 3a19e99b Sofia Papagiannaki
        length = len(odata)
1616 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1617 3a19e99b Sofia Papagiannaki
        last_byte_pos = first_byte_pos - 1
1618 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1619 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1620 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1621 3a19e99b Sofia Papagiannaki
1622 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1623 5fe43b8c Sofia Papagiannaki
        r = self.post(url, data=get_random_data(), **kwargs)
1624 3a19e99b Sofia Papagiannaki
1625 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
1626 3a19e99b Sofia Papagiannaki
1627 3a19e99b Sofia Papagiannaki
    def test_update_object_out_of_limits(self):
1628 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1629 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1630 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(block_size + 1,
1631 3a19e99b Sofia Papagiannaki
                                                  2 * block_size))[:2]
1632 3a19e99b Sofia Papagiannaki
1633 3a19e99b Sofia Papagiannaki
        length = len(odata)
1634 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1635 3a19e99b Sofia Papagiannaki
        last_byte_pos = length + 1
1636 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1637 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1638 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1639 3a19e99b Sofia Papagiannaki
1640 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1641 5fe43b8c Sofia Papagiannaki
        r = self.post(url, data=get_random_data(), **kwargs)
1642 3a19e99b Sofia Papagiannaki
1643 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
1644 3a19e99b Sofia Papagiannaki
1645 3a19e99b Sofia Papagiannaki
    def test_append(self):
1646 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
1647 5fe43b8c Sofia Papagiannaki
        length = len(data)
1648 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1649 3a19e99b Sofia Papagiannaki
                        self.object)
1650 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, content_type='application/octet-stream',
1651 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_LENGTH=str(length),
1652 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*')
1653 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1654 3a19e99b Sofia Papagiannaki
1655 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1656 3a19e99b Sofia Papagiannaki
        content = r.content
1657 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(content), len(self.object_data) + length)
1658 3a19e99b Sofia Papagiannaki
        self.assertEqual(content, self.object_data + data)
1659 3a19e99b Sofia Papagiannaki
1660 5fe43b8c Sofia Papagiannaki
    # TODO Fix the test
1661 5fe43b8c Sofia Papagiannaki
    def _test_update_with_chunked_transfer(self):
1662 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
1663 5fe43b8c Sofia Papagiannaki
        length = len(data)
1664 3a19e99b Sofia Papagiannaki
1665 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1666 3a19e99b Sofia Papagiannaki
                        self.object)
1667 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, content_type='application/octet-stream',
1668 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1669 3a19e99b Sofia Papagiannaki
                      HTTP_TRANSFER_ENCODING='chunked')
1670 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1671 3a19e99b Sofia Papagiannaki
1672 3a19e99b Sofia Papagiannaki
        # check modified object
1673 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1674 3a19e99b Sofia Papagiannaki
        content = r.content
1675 3a19e99b Sofia Papagiannaki
        self.assertEqual(content[0:length], data)
1676 3a19e99b Sofia Papagiannaki
        self.assertEqual(content[length:], self.object_data[length:])
1677 3a19e99b Sofia Papagiannaki
1678 3a19e99b Sofia Papagiannaki
    def test_update_from_other_object(self):
1679 3a19e99b Sofia Papagiannaki
        src = self.object
1680 bc4f25c0 Sofia Papagiannaki
        dest = get_random_name()
1681 3a19e99b Sofia Papagiannaki
1682 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1683 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1684 3a19e99b Sofia Papagiannaki
        source_data = r.content
1685 3a19e99b Sofia Papagiannaki
        source_meta = self.get_object_info(self.container, src)
1686 3a19e99b Sofia Papagiannaki
1687 3a19e99b Sofia Papagiannaki
        # update zero length object
1688 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1689 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='')
1690 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1691 3a19e99b Sofia Papagiannaki
1692 3a19e99b Sofia Papagiannaki
        r = self.post(url,
1693 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1694 3a19e99b Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1695 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1696 3a19e99b Sofia Papagiannaki
1697 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1698 3a19e99b Sofia Papagiannaki
        dest_data = r.content
1699 3a19e99b Sofia Papagiannaki
        dest_meta = self.get_object_info(self.container, dest)
1700 3a19e99b Sofia Papagiannaki
1701 3a19e99b Sofia Papagiannaki
        self.assertEqual(source_data, dest_data)
1702 3a19e99b Sofia Papagiannaki
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1703 3a19e99b Sofia Papagiannaki
        self.assertEqual(source_meta['X-Object-Hash'],
1704 3a19e99b Sofia Papagiannaki
                         dest_meta['X-Object-Hash'])
1705 3a19e99b Sofia Papagiannaki
        self.assertTrue(
1706 3a19e99b Sofia Papagiannaki
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1707 3a19e99b Sofia Papagiannaki
1708 3a19e99b Sofia Papagiannaki
    def test_update_range_from_other_object(self):
1709 3a19e99b Sofia Papagiannaki
        src = self.object
1710 bc4f25c0 Sofia Papagiannaki
        dest = get_random_name()
1711 3a19e99b Sofia Papagiannaki
1712 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1713 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1714 3a19e99b Sofia Papagiannaki
        source_data = r.content
1715 f4b0dbd8 Sofia Papagiannaki
        source_length = len(source_data)
1716 3a19e99b Sofia Papagiannaki
1717 3a19e99b Sofia Papagiannaki
        # update zero length object
1718 c977b0b6 Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1719 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1720 c977b0b6 Sofia Papagiannaki
        initial_data = get_random_data(
1721 c977b0b6 Sofia Papagiannaki
            length=source_length + random.randint(0, block_size))
1722 c977b0b6 Sofia Papagiannaki
1723 a1f429b2 Sofia Papagiannaki
        r = self.put(url, data=initial_data)
1724 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1725 3a19e99b Sofia Papagiannaki
1726 f4b0dbd8 Sofia Papagiannaki
        offset = random.randint(0, source_length - 1)
1727 f4b0dbd8 Sofia Papagiannaki
        upto = random.randint(offset, source_length - 1)
1728 3a19e99b Sofia Papagiannaki
        r = self.post(url,
1729 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1730 3a19e99b Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1731 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1732 3a19e99b Sofia Papagiannaki
1733 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1734 3a19e99b Sofia Papagiannaki
        content = r.content
1735 a1f429b2 Sofia Papagiannaki
        self.assertEqual(content, (initial_data[:offset] +
1736 a1f429b2 Sofia Papagiannaki
                                   source_data[:upto - offset + 1] +
1737 a1f429b2 Sofia Papagiannaki
                                   initial_data[upto + 1:]))
1738 3a19e99b Sofia Papagiannaki
1739 e4c7cbeb Sofia Papagiannaki
    def test_update_range_from_invalid_other_object(self):
1740 e4c7cbeb Sofia Papagiannaki
        src = self.object
1741 e4c7cbeb Sofia Papagiannaki
        dest = get_random_name()
1742 e4c7cbeb Sofia Papagiannaki
1743 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1744 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1745 e4c7cbeb Sofia Papagiannaki
1746 e4c7cbeb Sofia Papagiannaki
        # update zero length object
1747 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1748 e4c7cbeb Sofia Papagiannaki
        initial_data = get_random_data()
1749 e4c7cbeb Sofia Papagiannaki
        length = len(initial_data)
1750 e4c7cbeb Sofia Papagiannaki
        r = self.put(url, data=initial_data)
1751 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1752 e4c7cbeb Sofia Papagiannaki
1753 e4c7cbeb Sofia Papagiannaki
        offset = random.randint(1, length - 2)
1754 e4c7cbeb Sofia Papagiannaki
        upto = random.randint(offset, length - 1)
1755 e4c7cbeb Sofia Papagiannaki
1756 e4c7cbeb Sofia Papagiannaki
        # source object does not start with /
1757 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1758 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1759 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1760 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1761 e4c7cbeb Sofia Papagiannaki
1762 e4c7cbeb Sofia Papagiannaki
        # source object does not exist
1763 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1764 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1765 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1766 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1767 e4c7cbeb Sofia Papagiannaki
1768 00064fb8 Sofia Papagiannaki
    def test_restore_version(self):
1769 e4c7cbeb Sofia Papagiannaki
        info = self.get_object_info(self.container, self.object)
1770 00064fb8 Sofia Papagiannaki
        v = []
1771 00064fb8 Sofia Papagiannaki
        append = v.append
1772 00064fb8 Sofia Papagiannaki
        append((info['X-Object-Version'],
1773 00064fb8 Sofia Papagiannaki
                int(info['Content-Length']),
1774 00064fb8 Sofia Papagiannaki
                self.object_data))
1775 e4c7cbeb Sofia Papagiannaki
1776 e4c7cbeb Sofia Papagiannaki
        # update object
1777 00064fb8 Sofia Papagiannaki
        data, r = self.upload_object(self.container, self.object,
1778 00064fb8 Sofia Papagiannaki
                                     length=v[0][1] - 1)[1:]
1779 e4c7cbeb Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1780 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1781 00064fb8 Sofia Papagiannaki
        # v[0][1] > v[1][1]
1782 e4c7cbeb Sofia Papagiannaki
1783 00064fb8 Sofia Papagiannaki
        # update with the previous version
1784 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1785 e4c7cbeb Sofia Papagiannaki
                        self.object)
1786 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1787 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1788 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1789 e4c7cbeb Sofia Papagiannaki
                                                       self.object),
1790 00064fb8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=v[0][0],
1791 00064fb8 Sofia Papagiannaki
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1792 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1793 00064fb8 Sofia Papagiannaki
        # v[2][1] = v[0][1] > v[1][1]
1794 e4c7cbeb Sofia Papagiannaki
1795 e4c7cbeb Sofia Papagiannaki
        # check content
1796 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1797 e4c7cbeb Sofia Papagiannaki
        content = r.content
1798 00064fb8 Sofia Papagiannaki
        self.assertEqual(len(content), v[0][1])
1799 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(content, self.object_data)
1800 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(content), content))
1801 e4c7cbeb Sofia Papagiannaki
1802 00064fb8 Sofia Papagiannaki
        # update object content(v4) > content(v2)
1803 00064fb8 Sofia Papagiannaki
        data, r = self.upload_object(self.container, self.object,
1804 00064fb8 Sofia Papagiannaki
                                     length=v[2][1] + 1)[1:]
1805 e4c7cbeb Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1806 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1807 00064fb8 Sofia Papagiannaki
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1808 e4c7cbeb Sofia Papagiannaki
1809 e4c7cbeb Sofia Papagiannaki
        # update with the previous version
1810 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1811 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1812 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1813 e4c7cbeb Sofia Papagiannaki
                                                       self.object),
1814 00064fb8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=v[2][0],
1815 00064fb8 Sofia Papagiannaki
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1816 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1817 00064fb8 Sofia Papagiannaki
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1818 e4c7cbeb Sofia Papagiannaki
1819 e4c7cbeb Sofia Papagiannaki
        # check content
1820 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1821 00064fb8 Sofia Papagiannaki
        data = r.content
1822 00064fb8 Sofia Papagiannaki
        self.assertEqual(data, v[2][2])
1823 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1824 e4c7cbeb Sofia Papagiannaki
1825 f53483d8 Sofia Papagiannaki
    def test_update_from_other_version(self):
1826 f53483d8 Sofia Papagiannaki
        versions = []
1827 f53483d8 Sofia Papagiannaki
        info = self.get_object_info(self.container, self.object)
1828 f53483d8 Sofia Papagiannaki
        versions.append(info['X-Object-Version'])
1829 f53483d8 Sofia Papagiannaki
        pre_length = int(info['Content-Length'])
1830 f53483d8 Sofia Papagiannaki
1831 f53483d8 Sofia Papagiannaki
        # update object
1832 f53483d8 Sofia Papagiannaki
        d1, r = self.upload_object(self.container, self.object,
1833 f53483d8 Sofia Papagiannaki
                                   length=pre_length - 1)[1:]
1834 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1835 f53483d8 Sofia Papagiannaki
        versions.append(r['X-Object-Version'])
1836 f53483d8 Sofia Papagiannaki
1837 f53483d8 Sofia Papagiannaki
        # update object
1838 f53483d8 Sofia Papagiannaki
        d2, r = self.upload_object(self.container, self.object,
1839 f53483d8 Sofia Papagiannaki
                                   length=pre_length - 2)[1:]
1840 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1841 f53483d8 Sofia Papagiannaki
        versions.append(r['X-Object-Version'])
1842 f53483d8 Sofia Papagiannaki
1843 f53483d8 Sofia Papagiannaki
        # get previous version
1844 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1845 f53483d8 Sofia Papagiannaki
                        self.object)
1846 f53483d8 Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
1847 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1848 f53483d8 Sofia Papagiannaki
        l = json.loads(r.content)['versions']
1849 f53483d8 Sofia Papagiannaki
        self.assertEqual(len(l), 3)
1850 f53483d8 Sofia Papagiannaki
        self.assertEqual([str(v[0]) for v in l], versions)
1851 f53483d8 Sofia Papagiannaki
1852 f53483d8 Sofia Papagiannaki
        # update with the previous version
1853 f53483d8 Sofia Papagiannaki
        r = self.post(url,
1854 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1855 f53483d8 Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1856 f53483d8 Sofia Papagiannaki
                                                       self.object),
1857 f53483d8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=versions[0])
1858 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1859 f53483d8 Sofia Papagiannaki
1860 f53483d8 Sofia Papagiannaki
        # check content
1861 f53483d8 Sofia Papagiannaki
        r = self.get(url)
1862 f53483d8 Sofia Papagiannaki
        content = r.content
1863 f53483d8 Sofia Papagiannaki
        self.assertEqual(len(content), pre_length)
1864 f53483d8 Sofia Papagiannaki
        self.assertEqual(content, self.object_data)
1865 f53483d8 Sofia Papagiannaki
1866 f53483d8 Sofia Papagiannaki
        # update object
1867 f53483d8 Sofia Papagiannaki
        d3, r = self.upload_object(self.container, self.object,
1868 f53483d8 Sofia Papagiannaki
                                   length=len(d2) + 1)[1:]
1869 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1870 f53483d8 Sofia Papagiannaki
        versions.append(r['X-Object-Version'])
1871 f53483d8 Sofia Papagiannaki
1872 f53483d8 Sofia Papagiannaki
        # update with the previous version
1873 f53483d8 Sofia Papagiannaki
        r = self.post(url,
1874 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1875 f53483d8 Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1876 f53483d8 Sofia Papagiannaki
                                                       self.object),
1877 f53483d8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=versions[-2])
1878 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1879 f53483d8 Sofia Papagiannaki
1880 f53483d8 Sofia Papagiannaki
        # check content
1881 f53483d8 Sofia Papagiannaki
        r = self.get(url)
1882 f53483d8 Sofia Papagiannaki
        content = r.content
1883 f53483d8 Sofia Papagiannaki
        self.assertEqual(content, d2 + d3[-1])
1884 f53483d8 Sofia Papagiannaki
1885 6c9df07c Sofia Papagiannaki
    def test_update_invalid_permissions(self):
1886 6c9df07c Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1887 6c9df07c Sofia Papagiannaki
                        self.object)
1888 6c9df07c Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1889 6c9df07c Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='%s' % (257*'a'))
1890 6c9df07c Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1891 6c9df07c Sofia Papagiannaki
1892 6c9df07c Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1893 6c9df07c Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=%s' % (257*'a'))
1894 6c9df07c Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1895 6c9df07c Sofia Papagiannaki
1896 6c9df07c Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1897 6c9df07c Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=%s' % (257*'a'))
1898 6c9df07c Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1899 3a19e99b Sofia Papagiannaki
1900 f019f93d Ilias Tsitsimpis
1901 3a19e99b Sofia Papagiannaki
class ObjectDelete(PithosAPITest):
1902 3a19e99b Sofia Papagiannaki
    def setUp(self):
1903 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
1904 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
1905 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
1906 3a19e99b Sofia Papagiannaki
        self.object, self.object_data = self.upload_object(self.container)[:2]
1907 3a19e99b Sofia Papagiannaki
1908 3a19e99b Sofia Papagiannaki
    def test_delete(self):
1909 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1910 3a19e99b Sofia Papagiannaki
                        self.object)
1911 3a19e99b Sofia Papagiannaki
        r = self.delete(url)
1912 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1913 3a19e99b Sofia Papagiannaki
1914 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1915 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1916 3a19e99b Sofia Papagiannaki
1917 3a19e99b Sofia Papagiannaki
    def test_delete_non_existent(self):
1918 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1919 bc4f25c0 Sofia Papagiannaki
                        get_random_name())
1920 3a19e99b Sofia Papagiannaki
        r = self.delete(url)
1921 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1922 3a19e99b Sofia Papagiannaki
1923 3a19e99b Sofia Papagiannaki
    def test_delete_dir(self):
1924 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
1925 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
1926 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1927 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
1928 3a19e99b Sofia Papagiannaki
        append = objects.append
1929 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
1930 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_name()),
1931 6ee6677e Sofia Papagiannaki
                                  depth='1')[0])
1932 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
1933 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_name()),
1934 6ee6677e Sofia Papagiannaki
                                  depth='2')[0])
1935 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
1936 3a19e99b Sofia Papagiannaki
1937 3a19e99b Sofia Papagiannaki
        # move dir
1938 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1939 3a19e99b Sofia Papagiannaki
        r = self.delete('%s?delimiter=/' % url)
1940 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1941 3a19e99b Sofia Papagiannaki
1942 3a19e99b Sofia Papagiannaki
        for obj in objects:
1943 3a19e99b Sofia Papagiannaki
            # assert object does not exist
1944 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1945 3a19e99b Sofia Papagiannaki
            r = self.head(url)
1946 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
1947 3a19e99b Sofia Papagiannaki
1948 3a19e99b Sofia Papagiannaki
        # assert other has not been deleted
1949 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, other)
1950 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1951 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)