Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 0c6ab9df

History | View | Annotate | Download (77.7 kB)

1 3a19e99b Sofia Papagiannaki
#!/usr/bin/env python
2 3a19e99b Sofia Papagiannaki
#coding=utf8
3 3a19e99b Sofia Papagiannaki
4 3a19e99b Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 3a19e99b Sofia Papagiannaki
#
6 3a19e99b Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 3a19e99b Sofia Papagiannaki
# without modification, are permitted provided that the following
8 3a19e99b Sofia Papagiannaki
# conditions are met:
9 3a19e99b Sofia Papagiannaki
#
10 3a19e99b Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 3a19e99b Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 3a19e99b Sofia Papagiannaki
#      disclaimer.
13 3a19e99b Sofia Papagiannaki
#
14 3a19e99b Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 3a19e99b Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 3a19e99b Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 3a19e99b Sofia Papagiannaki
#      provided with the distribution.
18 3a19e99b Sofia Papagiannaki
#
19 3a19e99b Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 3a19e99b Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 3a19e99b Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 3a19e99b Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 3a19e99b Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 3a19e99b Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 3a19e99b Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 3a19e99b Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 3a19e99b Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 3a19e99b Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 3a19e99b Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 3a19e99b Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 3a19e99b Sofia Papagiannaki
#
32 3a19e99b Sofia Papagiannaki
# The views and conclusions contained in the software and
33 3a19e99b Sofia Papagiannaki
# documentation are those of the authors and should not be
34 3a19e99b Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 3a19e99b Sofia Papagiannaki
# or implied, of GRNET S.A.
36 3a19e99b Sofia Papagiannaki
37 3a19e99b Sofia Papagiannaki
from collections import defaultdict
38 f53483d8 Sofia Papagiannaki
from urllib import quote, unquote
39 5fe43b8c Sofia Papagiannaki
from functools import partial
40 3a19e99b Sofia Papagiannaki
41 3a19e99b Sofia Papagiannaki
from pithos.api.test import (PithosAPITest, pithos_settings,
42 3a19e99b Sofia Papagiannaki
                             AssertMappingInvariant, AssertUUidInvariant,
43 5fe43b8c Sofia Papagiannaki
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44 0c6ab9df Sofia Papagiannaki
                             DATE_FORMATS, pithos_test_settings)
45 bc4f25c0 Sofia Papagiannaki
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46 bc4f25c0 Sofia Papagiannaki
                                  get_random_data, get_random_name)
47 3a19e99b Sofia Papagiannaki
48 3a19e99b Sofia Papagiannaki
from synnefo.lib import join_urls
49 3a19e99b Sofia Papagiannaki
50 3a19e99b Sofia Papagiannaki
import django.utils.simplejson as json
51 3a19e99b Sofia Papagiannaki
52 3a19e99b Sofia Papagiannaki
import random
53 3a19e99b Sofia Papagiannaki
import re
54 3a19e99b Sofia Papagiannaki
import datetime
55 5fe43b8c Sofia Papagiannaki
import time as _time
56 5fe43b8c Sofia Papagiannaki
57 5fe43b8c Sofia Papagiannaki
merkle = partial(merkle,
58 5fe43b8c Sofia Papagiannaki
                 blocksize=TEST_BLOCK_SIZE,
59 5fe43b8c Sofia Papagiannaki
                 blockhash=TEST_HASH_ALGORITHM)
60 3a19e99b Sofia Papagiannaki
61 3a19e99b Sofia Papagiannaki
62 2a194295 Sofia Papagiannaki
class ObjectHead(PithosAPITest):
63 f53483d8 Sofia Papagiannaki
    def test_get_object_meta(self):
64 2a194295 Sofia Papagiannaki
        cname = self.create_container()[0]
65 2a194295 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
66 2a194295 Sofia Papagiannaki
67 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
68 f53483d8 Sofia Papagiannaki
        r = self.head(url)
69 f53483d8 Sofia Papagiannaki
70 f53483d8 Sofia Papagiannaki
        mandatory = ['Etag',
71 f53483d8 Sofia Papagiannaki
                     'Content-Length',
72 f53483d8 Sofia Papagiannaki
                     'Content-Type',
73 f53483d8 Sofia Papagiannaki
                     'Last-Modified',
74 f53483d8 Sofia Papagiannaki
                     'X-Object-Hash',
75 f53483d8 Sofia Papagiannaki
                     'X-Object-UUID',
76 f53483d8 Sofia Papagiannaki
                     'X-Object-Version',
77 f53483d8 Sofia Papagiannaki
                     'X-Object-Version-Timestamp',
78 f53483d8 Sofia Papagiannaki
                     'X-Object-Modified-By']
79 f53483d8 Sofia Papagiannaki
        for i in mandatory:
80 f53483d8 Sofia Papagiannaki
            self.assertTrue(i in r)
81 f53483d8 Sofia Papagiannaki
82 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='',
83 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_ENCODING='gzip',
84 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_DISPOSITION=(
85 f53483d8 Sofia Papagiannaki
                          'attachment; filename="%s"' % oname))
86 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
87 f53483d8 Sofia Papagiannaki
88 2a194295 Sofia Papagiannaki
        r = self.head(url)
89 f53483d8 Sofia Papagiannaki
        for i in mandatory:
90 f53483d8 Sofia Papagiannaki
            self.assertTrue(i in r)
91 f53483d8 Sofia Papagiannaki
        self.assertTrue('Content-Encoding' in r)
92 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['Content-Encoding'], 'gzip')
93 f53483d8 Sofia Papagiannaki
        self.assertTrue('Content-Disposition' in r)
94 f53483d8 Sofia Papagiannaki
        self.assertEqual(unquote(r['Content-Disposition']),
95 f53483d8 Sofia Papagiannaki
                         'attachment; filename="%s"' % oname)
96 f53483d8 Sofia Papagiannaki
97 f53483d8 Sofia Papagiannaki
        prefix = 'myobject/'
98 f53483d8 Sofia Papagiannaki
        data = ''
99 f53483d8 Sofia Papagiannaki
        for i in range(random.randint(2, 10)):
100 f53483d8 Sofia Papagiannaki
            part = '%s%d' % (prefix, i)
101 f53483d8 Sofia Papagiannaki
            data += self.upload_object(cname, oname=part)[1]
102 f53483d8 Sofia Papagiannaki
103 f53483d8 Sofia Papagiannaki
        manifest = '%s/%s' % (cname, prefix)
104 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
105 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
106 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
107 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
108 f53483d8 Sofia Papagiannaki
109 f53483d8 Sofia Papagiannaki
        r = self.head(url)
110 f53483d8 Sofia Papagiannaki
        for i in mandatory:
111 f53483d8 Sofia Papagiannaki
            self.assertTrue(i in r)
112 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Manifest' in r)
113 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Manifest'], manifest)
114 2a194295 Sofia Papagiannaki
115 2a194295 Sofia Papagiannaki
116 3a19e99b Sofia Papagiannaki
class ObjectGet(PithosAPITest):
117 3a19e99b Sofia Papagiannaki
    def setUp(self):
118 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
119 3a19e99b Sofia Papagiannaki
        self.containers = ['c1', 'c2']
120 3a19e99b Sofia Papagiannaki
121 3a19e99b Sofia Papagiannaki
        # create some containers
122 3a19e99b Sofia Papagiannaki
        for c in self.containers:
123 3a19e99b Sofia Papagiannaki
            self.create_container(c)
124 3a19e99b Sofia Papagiannaki
125 3a19e99b Sofia Papagiannaki
        # upload files
126 3a19e99b Sofia Papagiannaki
        self.objects = defaultdict(list)
127 3a19e99b Sofia Papagiannaki
        self.objects['c1'].append(self.upload_object('c1')[0])
128 3a19e99b Sofia Papagiannaki
129 3a19e99b Sofia Papagiannaki
    def test_versions(self):
130 3a19e99b Sofia Papagiannaki
        c = 'c1'
131 3a19e99b Sofia Papagiannaki
        o = self.objects[c][0]
132 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
133 3a19e99b Sofia Papagiannaki
134 3a19e99b Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
135 3a19e99b Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
136 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
137 3a19e99b Sofia Papagiannaki
138 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
139 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
140 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
141 3a19e99b Sofia Papagiannaki
        l1 = json.loads(r.content)['versions']
142 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l1), 2)
143 3a19e99b Sofia Papagiannaki
144 3a19e99b Sofia Papagiannaki
        # update meta
145 3a19e99b Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
146 3a19e99b Sofia Papagiannaki
                'HTTP_X_OBJECT_META_STOCK': 'True'}
147 3a19e99b Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
148 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
149 3a19e99b Sofia Papagiannaki
150 3a19e99b Sofia Papagiannaki
        # assert a newly created version has been created
151 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
152 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
153 3a19e99b Sofia Papagiannaki
        l2 = json.loads(r.content)['versions']
154 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l2), len(l1) + 1)
155 3a19e99b Sofia Papagiannaki
        self.assertEqual(l2[:-1], l1)
156 3a19e99b Sofia Papagiannaki
157 3a19e99b Sofia Papagiannaki
        vserial, _ = l2[-2]
158 3a19e99b Sofia Papagiannaki
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
159 6ee6677e Sofia Papagiannaki
                         {'Quality': 'AAA'})
160 3a19e99b Sofia Papagiannaki
161 3a19e99b Sofia Papagiannaki
        # update data
162 3a19e99b Sofia Papagiannaki
        self.append_object_data(c, o)
163 3a19e99b Sofia Papagiannaki
164 3a19e99b Sofia Papagiannaki
        # assert a newly created version has been created
165 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
166 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
167 3a19e99b Sofia Papagiannaki
        l3 = json.loads(r.content)['versions']
168 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l3), len(l2) + 1)
169 3a19e99b Sofia Papagiannaki
        self.assertEqual(l3[:-1], l2)
170 3a19e99b Sofia Papagiannaki
171 adce84cd Sofia Papagiannaki
    def test_get_version(self):
172 adce84cd Sofia Papagiannaki
        c = 'c1'
173 adce84cd Sofia Papagiannaki
        o = self.objects[c][0]
174 adce84cd Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
175 adce84cd Sofia Papagiannaki
176 adce84cd Sofia Papagiannaki
        # Update metadata
177 adce84cd Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
178 adce84cd Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
179 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
180 adce84cd Sofia Papagiannaki
181 adce84cd Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
182 adce84cd Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
183 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
184 adce84cd Sofia Papagiannaki
        l = json.loads(r.content)['versions']
185 adce84cd Sofia Papagiannaki
        self.assertEqual(len(l), 2)
186 adce84cd Sofia Papagiannaki
187 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, l[0][0]))
188 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
189 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Quality' not in r)
190 adce84cd Sofia Papagiannaki
191 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, l[1][0]))
192 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
193 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Quality' in r)
194 adce84cd Sofia Papagiannaki
195 adce84cd Sofia Papagiannaki
        # test invalid version
196 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=-1' % url)
197 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
198 adce84cd Sofia Papagiannaki
199 adce84cd Sofia Papagiannaki
        other_name, other_data, r = self.upload_object(c)
200 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
201 adce84cd Sofia Papagiannaki
        other_version = r['X-Object-Version']
202 adce84cd Sofia Papagiannaki
203 adce84cd Sofia Papagiannaki
        self.assertTrue(o != other_name)
204 adce84cd Sofia Papagiannaki
205 adce84cd Sofia Papagiannaki
        r = self.get('%s?version=%s' % (url, other_version))
206 6ee6677e Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
207 adce84cd Sofia Papagiannaki
208 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, other_version))
209 6ee6677e Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
210 adce84cd Sofia Papagiannaki
211 3a19e99b Sofia Papagiannaki
    def test_objects_with_trailing_spaces(self):
212 3a19e99b Sofia Papagiannaki
        # create object
213 3a19e99b Sofia Papagiannaki
        oname = self.upload_object('c1')[0]
214 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
215 3a19e99b Sofia Papagiannaki
216 3a19e99b Sofia Papagiannaki
        r = self.get(quote('%s ' % url))
217 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
218 3a19e99b Sofia Papagiannaki
219 3a19e99b Sofia Papagiannaki
        # delete object
220 3a19e99b Sofia Papagiannaki
        self.delete(url)
221 3a19e99b Sofia Papagiannaki
222 3a19e99b Sofia Papagiannaki
        r = self.get(url)
223 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
224 3a19e99b Sofia Papagiannaki
225 3a19e99b Sofia Papagiannaki
        # upload object with trailing space
226 bc4f25c0 Sofia Papagiannaki
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
227 3a19e99b Sofia Papagiannaki
228 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
229 3a19e99b Sofia Papagiannaki
        r = self.get(url)
230 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
231 3a19e99b Sofia Papagiannaki
232 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
233 3a19e99b Sofia Papagiannaki
        r = self.get(url)
234 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
235 3a19e99b Sofia Papagiannaki
236 3a19e99b Sofia Papagiannaki
    def test_get_partial(self):
237 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
238 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
239 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
240 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=0-499')
241 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
242 3a19e99b Sofia Papagiannaki
        data = r.content
243 3a19e99b Sofia Papagiannaki
        self.assertEqual(data, odata[:500])
244 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
245 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
246 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
247 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
248 3a19e99b Sofia Papagiannaki
249 3a19e99b Sofia Papagiannaki
    def test_get_final_500(self):
250 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
251 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
252 3a19e99b Sofia Papagiannaki
        size = len(odata)
253 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
254 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=-500')
255 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
256 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata[-500:])
257 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
258 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
259 3a19e99b Sofia Papagiannaki
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
260 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
261 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
262 3a19e99b Sofia Papagiannaki
263 3a19e99b Sofia Papagiannaki
    def test_get_rest(self):
264 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
265 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
266 3a19e99b Sofia Papagiannaki
        size = len(odata)
267 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
268 3a19e99b Sofia Papagiannaki
        offset = len(odata) - random.randint(1, 512)
269 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
270 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
271 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata[offset:])
272 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
273 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
274 3a19e99b Sofia Papagiannaki
                         'bytes %s-%s/%s' % (offset, size - 1, size))
275 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
276 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
277 3a19e99b Sofia Papagiannaki
278 3a19e99b Sofia Papagiannaki
    def test_get_range_not_satisfiable(self):
279 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
280 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
281 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
282 3a19e99b Sofia Papagiannaki
283 3a19e99b Sofia Papagiannaki
        # TODO
284 3a19e99b Sofia Papagiannaki
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
285 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r.status_code, 416)
286 3a19e99b Sofia Papagiannaki
287 3a19e99b Sofia Papagiannaki
        offset = len(odata) + 1
288 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
289 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
290 3a19e99b Sofia Papagiannaki
291 3a19e99b Sofia Papagiannaki
    def test_multiple_range(self):
292 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
293 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
294 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
295 3a19e99b Sofia Papagiannaki
296 3a19e99b Sofia Papagiannaki
        l = ['0-499', '-500', '1000-']
297 3a19e99b Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
298 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE=ranges)
299 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
300 3a19e99b Sofia Papagiannaki
        self.assertTrue('content-type' in r)
301 3a19e99b Sofia Papagiannaki
        p = re.compile(
302 3a19e99b Sofia Papagiannaki
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
303 3a19e99b Sofia Papagiannaki
            re.I)
304 3a19e99b Sofia Papagiannaki
        m = p.match(r['content-type'])
305 3a19e99b Sofia Papagiannaki
        if m is None:
306 3a19e99b Sofia Papagiannaki
            self.fail('Invalid multiple range content type')
307 3a19e99b Sofia Papagiannaki
        boundary = m.groupdict()['boundary']
308 3a19e99b Sofia Papagiannaki
        cparts = r.content.split('--%s' % boundary)[1:-1]
309 3a19e99b Sofia Papagiannaki
310 3a19e99b Sofia Papagiannaki
        # assert content parts length
311 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(cparts), len(l))
312 3a19e99b Sofia Papagiannaki
313 3a19e99b Sofia Papagiannaki
        # for each content part assert headers
314 3a19e99b Sofia Papagiannaki
        i = 0
315 3a19e99b Sofia Papagiannaki
        for cpart in cparts:
316 3a19e99b Sofia Papagiannaki
            content = cpart.split('\r\n')
317 3a19e99b Sofia Papagiannaki
            headers = content[1:3]
318 3a19e99b Sofia Papagiannaki
            content_range = headers[0].split(': ')
319 3a19e99b Sofia Papagiannaki
            self.assertEqual(content_range[0], 'Content-Range')
320 3a19e99b Sofia Papagiannaki
321 3a19e99b Sofia Papagiannaki
            r = l[i].split('-')
322 3a19e99b Sofia Papagiannaki
            if not r[0] and not r[1]:
323 3a19e99b Sofia Papagiannaki
                pass
324 3a19e99b Sofia Papagiannaki
            elif not r[0]:
325 3a19e99b Sofia Papagiannaki
                start = len(odata) - int(r[1])
326 3a19e99b Sofia Papagiannaki
                end = len(odata)
327 3a19e99b Sofia Papagiannaki
            elif not r[1]:
328 3a19e99b Sofia Papagiannaki
                start = int(r[0])
329 3a19e99b Sofia Papagiannaki
                end = len(odata)
330 3a19e99b Sofia Papagiannaki
            else:
331 3a19e99b Sofia Papagiannaki
                start = int(r[0])
332 3a19e99b Sofia Papagiannaki
                end = int(r[1]) + 1
333 3a19e99b Sofia Papagiannaki
            fdata = odata[start:end]
334 3a19e99b Sofia Papagiannaki
            sdata = '\r\n'.join(content[4:-1])
335 3a19e99b Sofia Papagiannaki
            self.assertEqual(len(fdata), len(sdata))
336 3a19e99b Sofia Papagiannaki
            self.assertEquals(fdata, sdata)
337 3a19e99b Sofia Papagiannaki
            i += 1
338 3a19e99b Sofia Papagiannaki
339 3a19e99b Sofia Papagiannaki
    def test_multiple_range_not_satisfiable(self):
340 3a19e99b Sofia Papagiannaki
        # perform get with multiple range
341 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
342 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
343 3a19e99b Sofia Papagiannaki
        out_of_range = len(odata) + 1
344 3a19e99b Sofia Papagiannaki
        l = ['0-499', '-500', '%d-' % out_of_range]
345 3a19e99b Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
346 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
347 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE=ranges)
348 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
349 3a19e99b Sofia Papagiannaki
350 5fe43b8c Sofia Papagiannaki
    def test_get_if_match(self):
351 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
352 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
353 3a19e99b Sofia Papagiannaki
354 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
355 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
356 3a19e99b Sofia Papagiannaki
357 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
358 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
359 3a19e99b Sofia Papagiannaki
        else:
360 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
361 3a19e99b Sofia Papagiannaki
362 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=etag)
363 3a19e99b Sofia Papagiannaki
364 3a19e99b Sofia Papagiannaki
        # assert get success
365 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
366 3a19e99b Sofia Papagiannaki
367 3a19e99b Sofia Papagiannaki
        # assert response content
368 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
369 3a19e99b Sofia Papagiannaki
370 5fe43b8c Sofia Papagiannaki
    def test_get_if_match_star(self):
371 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
372 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
373 3a19e99b Sofia Papagiannaki
374 3a19e99b Sofia Papagiannaki
        # perform get with If-Match *
375 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
376 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH='*')
377 3a19e99b Sofia Papagiannaki
378 3a19e99b Sofia Papagiannaki
        # assert get success
379 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
380 3a19e99b Sofia Papagiannaki
381 3a19e99b Sofia Papagiannaki
        # assert response content
382 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
383 3a19e99b Sofia Papagiannaki
384 5fe43b8c Sofia Papagiannaki
    def test_get_multiple_if_match(self):
385 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
386 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
387 3a19e99b Sofia Papagiannaki
388 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
389 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
390 3a19e99b Sofia Papagiannaki
391 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
392 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
393 3a19e99b Sofia Papagiannaki
        else:
394 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
395 3a19e99b Sofia Papagiannaki
396 5fe43b8c Sofia Papagiannaki
        quoted = lambda s: '"%s"' % s
397 5fe43b8c Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=','.join(
398 5fe43b8c Sofia Papagiannaki
            [quoted(etag), quoted(get_random_data(64))]))
399 3a19e99b Sofia Papagiannaki
400 3a19e99b Sofia Papagiannaki
        # assert get success
401 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
402 3a19e99b Sofia Papagiannaki
403 3a19e99b Sofia Papagiannaki
        # assert response content
404 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
405 3a19e99b Sofia Papagiannaki
406 3a19e99b Sofia Papagiannaki
    def test_if_match_precondition_failed(self):
407 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
408 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
409 3a19e99b Sofia Papagiannaki
410 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
411 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
412 bc4f25c0 Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
413 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 412)
414 3a19e99b Sofia Papagiannaki
415 a1f429b2 Sofia Papagiannaki
    def test_if_none_match(self):
416 3a19e99b Sofia Papagiannaki
        # upload object
417 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
418 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
419 3a19e99b Sofia Papagiannaki
420 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
421 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
422 3a19e99b Sofia Papagiannaki
        else:
423 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
424 3a19e99b Sofia Papagiannaki
425 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match
426 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
427 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
428 3a19e99b Sofia Papagiannaki
429 3a19e99b Sofia Papagiannaki
        # assert precondition_failed
430 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
431 3a19e99b Sofia Papagiannaki
432 3a19e99b Sofia Papagiannaki
        # update object data
433 3a19e99b Sofia Papagiannaki
        r = self.append_object_data(cname, oname)[-1]
434 3a19e99b Sofia Papagiannaki
        self.assertTrue(etag != r['ETag'])
435 3a19e99b Sofia Papagiannaki
436 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match
437 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
438 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
439 3a19e99b Sofia Papagiannaki
440 3a19e99b Sofia Papagiannaki
        # assert get success
441 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
442 3a19e99b Sofia Papagiannaki
443 a1f429b2 Sofia Papagiannaki
    def test_if_none_match_star(self):
444 3a19e99b Sofia Papagiannaki
        # upload object
445 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
446 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
447 3a19e99b Sofia Papagiannaki
448 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match with star
449 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
450 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
451 3a19e99b Sofia Papagiannaki
452 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
453 3a19e99b Sofia Papagiannaki
454 3a19e99b Sofia Papagiannaki
    def test_if_modified_since(self):
455 3a19e99b Sofia Papagiannaki
        # upload object
456 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
457 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
458 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
459 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
460 3a19e99b Sofia Papagiannaki
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
461 3a19e99b Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
462 3a19e99b Sofia Papagiannaki
463 3a19e99b Sofia Papagiannaki
        # Check not modified since
464 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
465 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
466 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
467 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
468 3a19e99b Sofia Papagiannaki
469 3a19e99b Sofia Papagiannaki
        _time.sleep(1)
470 3a19e99b Sofia Papagiannaki
471 3a19e99b Sofia Papagiannaki
        # update object data
472 3a19e99b Sofia Papagiannaki
        appended_data = self.append_object_data(cname, oname)[1]
473 3a19e99b Sofia Papagiannaki
474 3a19e99b Sofia Papagiannaki
        # Check modified since
475 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
476 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
477 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
478 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
479 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.content, odata + appended_data)
480 3a19e99b Sofia Papagiannaki
481 3a19e99b Sofia Papagiannaki
    def test_if_modified_since_invalid_date(self):
482 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
483 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
484 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
485 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
486 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
487 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
488 3a19e99b Sofia Papagiannaki
489 3a19e99b Sofia Papagiannaki
    def test_if_not_modified_since(self):
490 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
491 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
492 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
493 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
494 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
495 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
496 3a19e99b Sofia Papagiannaki
497 3a19e99b Sofia Papagiannaki
        # Check unmodified
498 3a19e99b Sofia Papagiannaki
        t1 = t + datetime.timedelta(seconds=1)
499 3a19e99b Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
500 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
501 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
502 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
503 2a194295 Sofia Papagiannaki
            self.assertEqual(r.content, odata)
504 3a19e99b Sofia Papagiannaki
505 3a19e99b Sofia Papagiannaki
        # modify object
506 3a19e99b Sofia Papagiannaki
        _time.sleep(2)
507 3a19e99b Sofia Papagiannaki
        self.append_object_data(cname, oname)
508 3a19e99b Sofia Papagiannaki
509 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
510 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
511 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
512 3a19e99b Sofia Papagiannaki
        t2 = t - datetime.timedelta(seconds=1)
513 3a19e99b Sofia Papagiannaki
        t2_formats = map(t2.strftime, DATE_FORMATS)
514 3a19e99b Sofia Papagiannaki
515 5fe43b8c Sofia Papagiannaki
        # check modified
516 3a19e99b Sofia Papagiannaki
        for t in t2_formats:
517 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
518 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
519 3a19e99b Sofia Papagiannaki
520 800af189 Sofia Papagiannaki
        # modify account: update object meta
521 3a19e99b Sofia Papagiannaki
        _time.sleep(1)
522 3a19e99b Sofia Papagiannaki
        self.update_object_meta(cname, oname, {'foo': 'bar'})
523 3a19e99b Sofia Papagiannaki
524 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
525 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
526 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
527 3a19e99b Sofia Papagiannaki
        t3 = t - datetime.timedelta(seconds=1)
528 3a19e99b Sofia Papagiannaki
        t3_formats = map(t3.strftime, DATE_FORMATS)
529 3a19e99b Sofia Papagiannaki
530 5fe43b8c Sofia Papagiannaki
        # check modified
531 3a19e99b Sofia Papagiannaki
        for t in t3_formats:
532 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
533 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
534 3a19e99b Sofia Papagiannaki
535 3a19e99b Sofia Papagiannaki
    def test_if_unmodified_since(self):
536 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
537 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
538 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
539 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
540 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
541 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
542 3a19e99b Sofia Papagiannaki
        t = t + datetime.timedelta(seconds=1)
543 3a19e99b Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
544 3a19e99b Sofia Papagiannaki
545 3a19e99b Sofia Papagiannaki
        for tf in t_formats:
546 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
547 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
548 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.content, odata)
549 3a19e99b Sofia Papagiannaki
550 3a19e99b Sofia Papagiannaki
    def test_if_unmodified_since_precondition_failed(self):
551 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
552 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
553 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
554 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
555 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
556 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
557 3a19e99b Sofia Papagiannaki
        t = t - datetime.timedelta(seconds=1)
558 3a19e99b Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
559 3a19e99b Sofia Papagiannaki
560 3a19e99b Sofia Papagiannaki
        for tf in t_formats:
561 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
562 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
563 3a19e99b Sofia Papagiannaki
564 3a19e99b Sofia Papagiannaki
    def test_hashes(self):
565 3a19e99b Sofia Papagiannaki
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
566 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
567 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=l)[:-1]
568 3a19e99b Sofia Papagiannaki
        size = len(odata)
569 3a19e99b Sofia Papagiannaki
570 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
571 3a19e99b Sofia Papagiannaki
        r = self.get('%s?format=json&hashmap' % url)
572 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
573 3a19e99b Sofia Papagiannaki
        body = json.loads(r.content)
574 3a19e99b Sofia Papagiannaki
575 3a19e99b Sofia Papagiannaki
        hashes = body['hashes']
576 3a19e99b Sofia Papagiannaki
        block_size = body['block_size']
577 3a19e99b Sofia Papagiannaki
        block_num = size / block_size if size / block_size == 0 else\
578 3a19e99b Sofia Papagiannaki
            size / block_size + 1
579 3a19e99b Sofia Papagiannaki
        self.assertTrue(len(hashes), block_num)
580 3a19e99b Sofia Papagiannaki
        i = 0
581 3a19e99b Sofia Papagiannaki
        for h in hashes:
582 3a19e99b Sofia Papagiannaki
            start = i * block_size
583 3a19e99b Sofia Papagiannaki
            end = (i + 1) * block_size
584 5fe43b8c Sofia Papagiannaki
            hash = merkle(odata[start:end])
585 3a19e99b Sofia Papagiannaki
            self.assertEqual(h, hash)
586 3a19e99b Sofia Papagiannaki
            i += 1
587 3a19e99b Sofia Papagiannaki
588 3a19e99b Sofia Papagiannaki
589 3a19e99b Sofia Papagiannaki
class ObjectPut(PithosAPITest):
590 3a19e99b Sofia Papagiannaki
    def setUp(self):
591 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
592 bc4f25c0 Sofia Papagiannaki
        self.container = get_random_name()
593 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
594 3a19e99b Sofia Papagiannaki
595 3a19e99b Sofia Papagiannaki
    def test_upload(self):
596 3a19e99b Sofia Papagiannaki
        cname = self.container
597 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
598 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
599 3a19e99b Sofia Papagiannaki
        meta = {'test': 'test1'}
600 3a19e99b Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
601 3a19e99b Sofia Papagiannaki
                       for k, v in meta.iteritems())
602 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
603 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data, content_type='application/pdf', **headers)
604 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
605 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
606 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
607 3a19e99b Sofia Papagiannaki
608 3a19e99b Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
609 3a19e99b Sofia Papagiannaki
610 3a19e99b Sofia Papagiannaki
        # assert object meta
611 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in info)
612 3a19e99b Sofia Papagiannaki
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
613 3a19e99b Sofia Papagiannaki
614 3a19e99b Sofia Papagiannaki
        # assert content-type
615 3a19e99b Sofia Papagiannaki
        self.assertEqual(info['content-type'], 'application/pdf')
616 3a19e99b Sofia Papagiannaki
617 3a19e99b Sofia Papagiannaki
        # assert uploaded content
618 3a19e99b Sofia Papagiannaki
        r = self.get(url)
619 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
620 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
621 3a19e99b Sofia Papagiannaki
622 3a19e99b Sofia Papagiannaki
    def test_maximum_upload_size_exceeds(self):
623 3a19e99b Sofia Papagiannaki
        cname = self.container
624 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
625 3a19e99b Sofia Papagiannaki
626 3a19e99b Sofia Papagiannaki
        # set container quota to 100
627 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname)
628 3a19e99b Sofia Papagiannaki
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
629 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
630 3a19e99b Sofia Papagiannaki
631 3a19e99b Sofia Papagiannaki
        info = self.get_container_info(cname)
632 3a19e99b Sofia Papagiannaki
        length = int(info['X-Container-Policy-Quota']) + 1
633 3a19e99b Sofia Papagiannaki
634 3a19e99b Sofia Papagiannaki
        data = get_random_data(length)
635 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
636 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data)
637 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 413)
638 3a19e99b Sofia Papagiannaki
639 3a19e99b Sofia Papagiannaki
    def test_upload_with_name_containing_slash(self):
640 3a19e99b Sofia Papagiannaki
        cname = self.container
641 bc4f25c0 Sofia Papagiannaki
        oname = '/%s' % get_random_name()
642 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
643 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
644 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data)
645 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
646 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
647 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
648 3a19e99b Sofia Papagiannaki
649 3a19e99b Sofia Papagiannaki
        r = self.get(url)
650 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
651 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
652 3a19e99b Sofia Papagiannaki
653 3a19e99b Sofia Papagiannaki
    def test_upload_unprocessable_entity(self):
654 3a19e99b Sofia Papagiannaki
        cname = self.container
655 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
656 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
657 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
658 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data, HTTP_ETAG='123')
659 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 422)
660 3a19e99b Sofia Papagiannaki
661 3a19e99b Sofia Papagiannaki
#    def test_chunked_transfer(self):
662 3a19e99b Sofia Papagiannaki
#        cname = self.container
663 bc4f25c0 Sofia Papagiannaki
#        oname = '/%s' % get_random_name()
664 5fe43b8c Sofia Papagiannaki
#        data = get_random_data()
665 3a19e99b Sofia Papagiannaki
#        url = join_urls(self.pithos_path, self.user, cname, oname)
666 3a19e99b Sofia Papagiannaki
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
667 3a19e99b Sofia Papagiannaki
#        self.assertEqual(r.status_code, 201)
668 3a19e99b Sofia Papagiannaki
#        self.assertTrue('ETag' in r)
669 3a19e99b Sofia Papagiannaki
#        self.assertTrue('X-Object-Version' in r)
670 3a19e99b Sofia Papagiannaki
671 3a19e99b Sofia Papagiannaki
    def test_manifestation(self):
672 3a19e99b Sofia Papagiannaki
        cname = self.container
673 3a19e99b Sofia Papagiannaki
        prefix = 'myobject/'
674 3a19e99b Sofia Papagiannaki
        data = ''
675 3a19e99b Sofia Papagiannaki
        for i in range(random.randint(2, 10)):
676 3a19e99b Sofia Papagiannaki
            part = '%s%d' % (prefix, i)
677 3a19e99b Sofia Papagiannaki
            data += self.upload_object(cname, oname=part)[1]
678 3a19e99b Sofia Papagiannaki
679 3a19e99b Sofia Papagiannaki
        manifest = '%s/%s' % (cname, prefix)
680 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
681 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
682 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
683 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
684 3a19e99b Sofia Papagiannaki
685 3a19e99b Sofia Papagiannaki
        # assert object exists
686 3a19e99b Sofia Papagiannaki
        r = self.get(url)
687 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
688 3a19e99b Sofia Papagiannaki
689 3a19e99b Sofia Papagiannaki
        # assert its content
690 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
691 3a19e99b Sofia Papagiannaki
692 3a19e99b Sofia Papagiannaki
        # invalid manifestation
693 bc4f25c0 Sofia Papagiannaki
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
694 3a19e99b Sofia Papagiannaki
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
695 3a19e99b Sofia Papagiannaki
        r = self.get(url)
696 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, '')
697 3a19e99b Sofia Papagiannaki
698 3a19e99b Sofia Papagiannaki
    def test_create_zero_length_object(self):
699 3a19e99b Sofia Papagiannaki
        cname = self.container
700 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
701 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
702 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='')
703 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
704 3a19e99b Sofia Papagiannaki
705 3a19e99b Sofia Papagiannaki
        r = self.get(url)
706 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
707 3a19e99b Sofia Papagiannaki
        self.assertEqual(int(r['Content-Length']), 0)
708 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, '')
709 3a19e99b Sofia Papagiannaki
710 3a19e99b Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
711 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
712 3a19e99b Sofia Papagiannaki
        body = json.loads(r.content)
713 3a19e99b Sofia Papagiannaki
        hashes = body['hashes']
714 5fe43b8c Sofia Papagiannaki
        hash = merkle('')
715 3a19e99b Sofia Papagiannaki
        self.assertEqual(hashes, [hash])
716 3a19e99b Sofia Papagiannaki
717 3a19e99b Sofia Papagiannaki
    def test_create_object_by_hashmap(self):
718 3a19e99b Sofia Papagiannaki
        cname = self.container
719 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
720 3a19e99b Sofia Papagiannaki
721 3a19e99b Sofia Papagiannaki
        # upload an object
722 3a19e99b Sofia Papagiannaki
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
723 3a19e99b Sofia Papagiannaki
        # get it hashmap
724 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
725 3a19e99b Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
726 3a19e99b Sofia Papagiannaki
727 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
728 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
729 3a19e99b Sofia Papagiannaki
        r = self.put('%s?hashmap=' % url, data=r.content)
730 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
731 3a19e99b Sofia Papagiannaki
732 3a19e99b Sofia Papagiannaki
        r = self.get(url)
733 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
734 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
735 3a19e99b Sofia Papagiannaki
736 3a5994a8 Sofia Papagiannaki
    def test_create_object_by_invalid_hashmap(self):
737 3a5994a8 Sofia Papagiannaki
        cname = self.container
738 3a5994a8 Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
739 3a5994a8 Sofia Papagiannaki
740 3a5994a8 Sofia Papagiannaki
        # upload an object
741 3a5994a8 Sofia Papagiannaki
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
742 3a5994a8 Sofia Papagiannaki
        # get it hashmap
743 3a5994a8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
744 3a5994a8 Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
745 3a5994a8 Sofia Papagiannaki
        data = r.content
746 3a5994a8 Sofia Papagiannaki
        try:
747 3a5994a8 Sofia Papagiannaki
            hashmap = json.loads(data)
748 3a5994a8 Sofia Papagiannaki
        except:
749 3a5994a8 Sofia Papagiannaki
            self.fail('JSON format expected')
750 3a5994a8 Sofia Papagiannaki
751 3a5994a8 Sofia Papagiannaki
        oname = get_random_name()
752 3a5994a8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
753 3a5994a8 Sofia Papagiannaki
        hashmap['hashes'] = [get_random_name()]
754 3a5994a8 Sofia Papagiannaki
        r = self.put('%s?hashmap=' % url, data=json.dumps(hashmap))
755 3a5994a8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
756 3a5994a8 Sofia Papagiannaki
757 3a19e99b Sofia Papagiannaki
758 f53483d8 Sofia Papagiannaki
class ObjectPutCopy(PithosAPITest):
759 3a19e99b Sofia Papagiannaki
    def setUp(self):
760 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
761 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
762 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
763 3a19e99b Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
764 3a19e99b Sofia Papagiannaki
765 3a19e99b Sofia Papagiannaki
        url = join_urls(
766 3a19e99b Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
767 3a19e99b Sofia Papagiannaki
        r = self.head(url)
768 3a19e99b Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
769 3a19e99b Sofia Papagiannaki
770 3a19e99b Sofia Papagiannaki
    def test_copy(self):
771 3a19e99b Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
772 3a19e99b Sofia Papagiannaki
                                    self.object):
773 3a19e99b Sofia Papagiannaki
            # copy object
774 bc4f25c0 Sofia Papagiannaki
            oname = get_random_name()
775 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, oname)
776 3a19e99b Sofia Papagiannaki
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
777 3a19e99b Sofia Papagiannaki
                         HTTP_X_COPY_FROM='/%s/%s' % (
778 3a19e99b Sofia Papagiannaki
                             self.container, self.object))
779 3a19e99b Sofia Papagiannaki
780 3a19e99b Sofia Papagiannaki
            # assert copy success
781 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
782 3a19e99b Sofia Papagiannaki
783 3a19e99b Sofia Papagiannaki
            # assert access the new object
784 3a19e99b Sofia Papagiannaki
            r = self.head(url)
785 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
786 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
787 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
788 3a19e99b Sofia Papagiannaki
789 3a19e99b Sofia Papagiannaki
            # assert etag is the same
790 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
791 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
792 3a19e99b Sofia Papagiannaki
793 3a19e99b Sofia Papagiannaki
    def test_copy_from_different_container(self):
794 3a19e99b Sofia Papagiannaki
        cname = 'c2'
795 3a19e99b Sofia Papagiannaki
        self.create_container(cname)
796 3a19e99b Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
797 3a19e99b Sofia Papagiannaki
                                    self.object):
798 bc4f25c0 Sofia Papagiannaki
            oname = get_random_name()
799 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, cname, oname)
800 3a19e99b Sofia Papagiannaki
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
801 3a19e99b Sofia Papagiannaki
                         HTTP_X_COPY_FROM='/%s/%s' % (
802 3a19e99b Sofia Papagiannaki
                             self.container, self.object))
803 3a19e99b Sofia Papagiannaki
804 3a19e99b Sofia Papagiannaki
            # assert copy success
805 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
806 3a19e99b Sofia Papagiannaki
807 3a19e99b Sofia Papagiannaki
            # assert access the new object
808 3a19e99b Sofia Papagiannaki
            r = self.head(url)
809 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
810 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
811 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
812 3a19e99b Sofia Papagiannaki
813 3a19e99b Sofia Papagiannaki
            # assert etag is the same
814 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
815 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
816 3a19e99b Sofia Papagiannaki
817 f53483d8 Sofia Papagiannaki
    def test_copy_from_other_account(self):
818 f53483d8 Sofia Papagiannaki
        cname = 'c2'
819 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='chuck')
820 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='alice')
821 f53483d8 Sofia Papagiannaki
822 f53483d8 Sofia Papagiannaki
        # share object for read with alice
823 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
824 f53483d8 Sofia Papagiannaki
                        self.object)
825 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
826 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=alice')
827 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
828 f53483d8 Sofia Papagiannaki
829 f53483d8 Sofia Papagiannaki
        # assert not allowed for chuck
830 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
831 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
832 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='chuck',
833 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
834 f53483d8 Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
835 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
836 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
837 f53483d8 Sofia Papagiannaki
838 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
839 f53483d8 Sofia Papagiannaki
840 f53483d8 Sofia Papagiannaki
        # assert copy success for alice
841 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, oname)
842 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='alice',
843 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
844 f53483d8 Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
845 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
846 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
847 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
848 f53483d8 Sofia Papagiannaki
849 f53483d8 Sofia Papagiannaki
        # assert access the new object
850 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='alice')
851 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
852 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
853 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
854 f53483d8 Sofia Papagiannaki
855 f53483d8 Sofia Papagiannaki
        # assert etag is the same
856 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
857 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
858 f53483d8 Sofia Papagiannaki
859 f53483d8 Sofia Papagiannaki
        # share object for write
860 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
861 f53483d8 Sofia Papagiannaki
                        self.object)
862 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
863 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=dan')
864 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
865 f53483d8 Sofia Papagiannaki
866 f53483d8 Sofia Papagiannaki
        # assert not allowed copy for alice
867 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, oname)
868 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='alice',
869 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
870 f53483d8 Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
871 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
872 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
873 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
874 f53483d8 Sofia Papagiannaki
875 f53483d8 Sofia Papagiannaki
        # assert allowed copy for dan
876 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='dan')
877 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'dan', cname, oname)
878 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='dan',
879 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
880 f53483d8 Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
881 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
882 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
883 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
884 f53483d8 Sofia Papagiannaki
885 f53483d8 Sofia Papagiannaki
        # assert access the new object
886 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='dan')
887 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
888 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
889 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
890 f53483d8 Sofia Papagiannaki
891 f53483d8 Sofia Papagiannaki
        # assert etag is the same
892 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
893 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
894 f53483d8 Sofia Papagiannaki
895 f53483d8 Sofia Papagiannaki
        # assert source object still exists
896 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
897 f53483d8 Sofia Papagiannaki
                        self.object)
898 f53483d8 Sofia Papagiannaki
        r = self.head(url)
899 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
900 f53483d8 Sofia Papagiannaki
        # assert etag is the same
901 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
902 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
903 f53483d8 Sofia Papagiannaki
904 f53483d8 Sofia Papagiannaki
        r = self.get(url)
905 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
906 f53483d8 Sofia Papagiannaki
        # assert etag is the same
907 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
908 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
909 f53483d8 Sofia Papagiannaki
910 3a19e99b Sofia Papagiannaki
    def test_copy_invalid(self):
911 3a19e99b Sofia Papagiannaki
        # copy from non-existent object
912 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
913 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
914 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
915 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
916 bc4f25c0 Sofia Papagiannaki
                         self.container, get_random_name()))
917 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
918 3a19e99b Sofia Papagiannaki
919 3a19e99b Sofia Papagiannaki
        # copy from non-existent container
920 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
921 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
922 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
923 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
924 bc4f25c0 Sofia Papagiannaki
                         get_random_name(), self.object))
925 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
926 3a19e99b Sofia Papagiannaki
927 0c6ab9df Sofia Papagiannaki
    @pithos_test_settings(API_LIST_LIMIT=10)
928 3a19e99b Sofia Papagiannaki
    def test_copy_dir(self):
929 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
930 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
931 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
932 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
933 3a19e99b Sofia Papagiannaki
        append = objects.append
934 0c6ab9df Sofia Papagiannaki
        for i in range(11):
935 0c6ab9df Sofia Papagiannaki
            append(self.upload_object(self.container,
936 0c6ab9df Sofia Papagiannaki
                                      '%s/%d' % (folder, i),
937 0c6ab9df Sofia Papagiannaki
                                      depth='1')[0])
938 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
939 3a19e99b Sofia Papagiannaki
940 3a19e99b Sofia Papagiannaki
        # copy dir
941 3a19e99b Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
942 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
943 3a19e99b Sofia Papagiannaki
                        copy_folder)
944 3a19e99b Sofia Papagiannaki
        r = self.put('%s?delimiter=/' % url, data='',
945 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
946 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
947 3a19e99b Sofia Papagiannaki
948 3a19e99b Sofia Papagiannaki
        for obj in objects:
949 3a19e99b Sofia Papagiannaki
            # assert object exists
950 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
951 3a19e99b Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
952 3a19e99b Sofia Papagiannaki
            r = self.head(url)
953 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
954 3a19e99b Sofia Papagiannaki
955 3a19e99b Sofia Papagiannaki
            # assert metadata
956 3a19e99b Sofia Papagiannaki
            meta = self.get_object_meta(self.container, obj)
957 3a19e99b Sofia Papagiannaki
            for k in meta.keys():
958 6ee6677e Sofia Papagiannaki
                key = 'X-Object-Meta-%s' % k
959 6ee6677e Sofia Papagiannaki
                self.assertTrue(key in r)
960 6ee6677e Sofia Papagiannaki
                self.assertEqual(r[key], meta[k])
961 3a19e99b Sofia Papagiannaki
962 3a19e99b Sofia Papagiannaki
        # assert other has not been created under copy folder
963 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
964 3a19e99b Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
965 3a19e99b Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
966 3a19e99b Sofia Papagiannaki
        r = self.head(url)
967 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
968 3a19e99b Sofia Papagiannaki
969 3a19e99b Sofia Papagiannaki
970 f53483d8 Sofia Papagiannaki
class ObjectPutMove(PithosAPITest):
971 3a19e99b Sofia Papagiannaki
    def setUp(self):
972 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
973 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
974 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
975 3a19e99b Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
976 3a19e99b Sofia Papagiannaki
977 3a19e99b Sofia Papagiannaki
        url = join_urls(
978 3a19e99b Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
979 3a19e99b Sofia Papagiannaki
        r = self.head(url)
980 3a19e99b Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
981 3a19e99b Sofia Papagiannaki
982 3a19e99b Sofia Papagiannaki
    def test_move(self):
983 3a19e99b Sofia Papagiannaki
        # move object
984 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
985 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
986 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
987 3a19e99b Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
988 3a19e99b Sofia Papagiannaki
                         self.container, self.object))
989 3a19e99b Sofia Papagiannaki
990 3a19e99b Sofia Papagiannaki
        # assert move success
991 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
992 3a19e99b Sofia Papagiannaki
993 3a19e99b Sofia Papagiannaki
        # assert access the new object
994 3a19e99b Sofia Papagiannaki
        r = self.head(url)
995 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
996 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
997 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
998 3a19e99b Sofia Papagiannaki
999 3a19e99b Sofia Papagiannaki
        # assert etag is the same
1000 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1001 3a19e99b Sofia Papagiannaki
1002 3a19e99b Sofia Papagiannaki
        # assert the initial object has been deleted
1003 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1004 3a19e99b Sofia Papagiannaki
                        self.object)
1005 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1006 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1007 3a19e99b Sofia Papagiannaki
1008 0c6ab9df Sofia Papagiannaki
    @pithos_test_settings(API_LIST_LIMIT=10)
1009 3a19e99b Sofia Papagiannaki
    def test_move_dir(self):
1010 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
1011 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
1012 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1013 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
1014 3a19e99b Sofia Papagiannaki
        append = objects.append
1015 0c6ab9df Sofia Papagiannaki
        for i in range(11):
1016 0c6ab9df Sofia Papagiannaki
            append(self.upload_object(self.container,
1017 0c6ab9df Sofia Papagiannaki
                                      '%s/%d' % (folder, i),
1018 0c6ab9df Sofia Papagiannaki
                                      depth='1')[0])
1019 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
1020 3a19e99b Sofia Papagiannaki
1021 3a19e99b Sofia Papagiannaki
        # move dir
1022 3a19e99b Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
1023 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1024 3a19e99b Sofia Papagiannaki
                        copy_folder)
1025 3a19e99b Sofia Papagiannaki
        r = self.put('%s?delimiter=/' % url, data='',
1026 3a19e99b Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
1027 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1028 3a19e99b Sofia Papagiannaki
1029 3a19e99b Sofia Papagiannaki
        for obj in objects:
1030 3a19e99b Sofia Papagiannaki
            # assert initial object does not exist
1031 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1032 3a19e99b Sofia Papagiannaki
            r = self.head(url)
1033 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
1034 3a19e99b Sofia Papagiannaki
1035 3a19e99b Sofia Papagiannaki
            # assert new object was created
1036 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1037 3a19e99b Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
1038 3a19e99b Sofia Papagiannaki
            r = self.head(url)
1039 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1040 3a19e99b Sofia Papagiannaki
1041 3a19e99b Sofia Papagiannaki
        # assert other has not been created under copy folder
1042 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1043 3a19e99b Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
1044 3a19e99b Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
1045 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1046 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1047 3a19e99b Sofia Papagiannaki
1048 f53483d8 Sofia Papagiannaki
    def test_move_from_other_account(self):
1049 f53483d8 Sofia Papagiannaki
        cname = 'c2'
1050 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='chuck')
1051 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='alice')
1052 f53483d8 Sofia Papagiannaki
1053 f53483d8 Sofia Papagiannaki
        # share object for read with alice
1054 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1055 f53483d8 Sofia Papagiannaki
                        self.object)
1056 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1057 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=alice')
1058 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1059 f53483d8 Sofia Papagiannaki
1060 f53483d8 Sofia Papagiannaki
        # assert not allowed move for chuck
1061 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
1062 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
1063 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='chuck',
1064 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
1065 f53483d8 Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1066 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
1067 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
1068 f53483d8 Sofia Papagiannaki
1069 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1070 f53483d8 Sofia Papagiannaki
1071 f53483d8 Sofia Papagiannaki
        # assert no new object was created
1072 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='chuck')
1073 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1074 f53483d8 Sofia Papagiannaki
1075 f53483d8 Sofia Papagiannaki
        # assert not allowed move for alice
1076 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1077 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='alice',
1078 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
1079 f53483d8 Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1080 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
1081 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
1082 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1083 f53483d8 Sofia Papagiannaki
1084 f53483d8 Sofia Papagiannaki
        # assert no new object was created
1085 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='alice')
1086 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1087 f53483d8 Sofia Papagiannaki
1088 f53483d8 Sofia Papagiannaki
        # share object for write with dan
1089 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1090 f53483d8 Sofia Papagiannaki
                        self.object)
1091 f53483d8 Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1092 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=dan')
1093 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1094 f53483d8 Sofia Papagiannaki
1095 f53483d8 Sofia Papagiannaki
        # assert not allowed move for alice
1096 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1097 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='alice',
1098 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
1099 f53483d8 Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1100 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
1101 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
1102 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1103 f53483d8 Sofia Papagiannaki
1104 f53483d8 Sofia Papagiannaki
        # assert no new object was created
1105 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='alice')
1106 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1107 f53483d8 Sofia Papagiannaki
1108 f53483d8 Sofia Papagiannaki
        # assert not allowed move for dan
1109 f53483d8 Sofia Papagiannaki
        self.create_container(cname, user='dan')
1110 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'dan', cname, oname)
1111 f53483d8 Sofia Papagiannaki
        r = self.put(url, data='', user='dan',
1112 f53483d8 Sofia Papagiannaki
                     HTTP_X_OBJECT_META_TEST='testcopy',
1113 f53483d8 Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1114 f53483d8 Sofia Papagiannaki
                         self.container, self.object),
1115 f53483d8 Sofia Papagiannaki
                     HTTP_X_SOURCE_ACCOUNT='user')
1116 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1117 f53483d8 Sofia Papagiannaki
1118 f53483d8 Sofia Papagiannaki
        # assert no new object was created
1119 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='dan')
1120 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1121 f53483d8 Sofia Papagiannaki
1122 f53483d8 Sofia Papagiannaki
1123 f53483d8 Sofia Papagiannaki
class ObjectCopy(PithosAPITest):
1124 f53483d8 Sofia Papagiannaki
    def setUp(self):
1125 f53483d8 Sofia Papagiannaki
        PithosAPITest.setUp(self)
1126 f53483d8 Sofia Papagiannaki
        self.container = 'c1'
1127 f53483d8 Sofia Papagiannaki
        self.create_container(self.container)
1128 f53483d8 Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
1129 f53483d8 Sofia Papagiannaki
1130 f53483d8 Sofia Papagiannaki
        url = join_urls(
1131 f53483d8 Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
1132 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1133 f53483d8 Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
1134 f53483d8 Sofia Papagiannaki
1135 f53483d8 Sofia Papagiannaki
    def test_copy(self):
1136 f53483d8 Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
1137 f53483d8 Sofia Papagiannaki
                                    self.object):
1138 f53483d8 Sofia Papagiannaki
            oname = get_random_name()
1139 f53483d8 Sofia Papagiannaki
            # copy object
1140 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1141 f53483d8 Sofia Papagiannaki
                            self.object)
1142 f53483d8 Sofia Papagiannaki
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1143 f53483d8 Sofia Papagiannaki
                          HTTP_DESTINATION='/%s/%s' % (self.container,
1144 f53483d8 Sofia Papagiannaki
                                                       oname))
1145 f53483d8 Sofia Papagiannaki
            # assert copy success
1146 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1147 f53483d8 Sofia Papagiannaki
                            oname)
1148 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
1149 f53483d8 Sofia Papagiannaki
1150 f53483d8 Sofia Papagiannaki
            # assert access the new object
1151 f53483d8 Sofia Papagiannaki
            r = self.head(url)
1152 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1153 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
1154 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1155 f53483d8 Sofia Papagiannaki
1156 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1157 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1158 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1159 f53483d8 Sofia Papagiannaki
1160 f53483d8 Sofia Papagiannaki
            # assert source object still exists
1161 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1162 f53483d8 Sofia Papagiannaki
                            self.object)
1163 f53483d8 Sofia Papagiannaki
            r = self.head(url)
1164 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1165 f53483d8 Sofia Papagiannaki
1166 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1167 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1168 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1169 f53483d8 Sofia Papagiannaki
1170 f53483d8 Sofia Papagiannaki
            r = self.get(url)
1171 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1172 f53483d8 Sofia Papagiannaki
1173 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1174 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1175 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1176 f53483d8 Sofia Papagiannaki
1177 f53483d8 Sofia Papagiannaki
            # copy object to other container (not existing)
1178 f53483d8 Sofia Papagiannaki
            cname = get_random_name()
1179 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1180 f53483d8 Sofia Papagiannaki
                            self.object)
1181 f53483d8 Sofia Papagiannaki
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1182 f53483d8 Sofia Papagiannaki
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1183 f53483d8 Sofia Papagiannaki
1184 f53483d8 Sofia Papagiannaki
            # assert destination container does not exist
1185 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, cname,
1186 f53483d8 Sofia Papagiannaki
                            self.object)
1187 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
1188 f53483d8 Sofia Papagiannaki
1189 f53483d8 Sofia Papagiannaki
            # create container
1190 f53483d8 Sofia Papagiannaki
            self.create_container(cname)
1191 f53483d8 Sofia Papagiannaki
1192 f53483d8 Sofia Papagiannaki
            # copy object to other container (existing)
1193 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1194 f53483d8 Sofia Papagiannaki
                            self.object)
1195 f53483d8 Sofia Papagiannaki
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1196 f53483d8 Sofia Papagiannaki
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1197 f53483d8 Sofia Papagiannaki
1198 f53483d8 Sofia Papagiannaki
            # assert copy success
1199 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, cname,
1200 f53483d8 Sofia Papagiannaki
                            self.object)
1201 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
1202 f53483d8 Sofia Papagiannaki
1203 f53483d8 Sofia Papagiannaki
            # assert access the new object
1204 f53483d8 Sofia Papagiannaki
            r = self.head(url)
1205 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1206 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
1207 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1208 f53483d8 Sofia Papagiannaki
1209 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1210 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1211 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1212 f53483d8 Sofia Papagiannaki
1213 0c6ab9df Sofia Papagiannaki
            # assert source object still exists
1214 f53483d8 Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1215 f53483d8 Sofia Papagiannaki
                            self.object)
1216 f53483d8 Sofia Papagiannaki
            r = self.head(url)
1217 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1218 f53483d8 Sofia Papagiannaki
1219 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1220 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1221 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1222 f53483d8 Sofia Papagiannaki
1223 f53483d8 Sofia Papagiannaki
            r = self.get(url)
1224 f53483d8 Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1225 f53483d8 Sofia Papagiannaki
1226 f53483d8 Sofia Papagiannaki
            # assert etag is the same
1227 f53483d8 Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
1228 f53483d8 Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
1229 f53483d8 Sofia Papagiannaki
1230 0c6ab9df Sofia Papagiannaki
    @pithos_test_settings(API_LIST_LIMIT=10)
1231 0c6ab9df Sofia Papagiannaki
    def test_copy_dir_contents(self):
1232 0c6ab9df Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
1233 0c6ab9df Sofia Papagiannaki
        subfolder = self.create_folder(
1234 0c6ab9df Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1235 0c6ab9df Sofia Papagiannaki
        objects = [subfolder]
1236 0c6ab9df Sofia Papagiannaki
        append = objects.append
1237 0c6ab9df Sofia Papagiannaki
        for i in range(11):
1238 0c6ab9df Sofia Papagiannaki
            append(self.upload_object(self.container,
1239 0c6ab9df Sofia Papagiannaki
                                      '%s/%d' % (folder, i),
1240 0c6ab9df Sofia Papagiannaki
                                      depth='1')[0])
1241 0c6ab9df Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
1242 0c6ab9df Sofia Papagiannaki
1243 0c6ab9df Sofia Papagiannaki
        # copy dir
1244 0c6ab9df Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1245 0c6ab9df Sofia Papagiannaki
                        folder)
1246 0c6ab9df Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
1247 0c6ab9df Sofia Papagiannaki
        r = self.copy('%s?delimiter=/' % url,
1248 0c6ab9df Sofia Papagiannaki
                      HTTP_X_OBJECT_META_TEST='testcopy',
1249 0c6ab9df Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1250 0c6ab9df Sofia Papagiannaki
                                                   copy_folder))
1251 0c6ab9df Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1252 0c6ab9df Sofia Papagiannaki
1253 0c6ab9df Sofia Papagiannaki
        for obj in objects:
1254 0c6ab9df Sofia Papagiannaki
            # assert object exists
1255 0c6ab9df Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1256 0c6ab9df Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
1257 0c6ab9df Sofia Papagiannaki
            r = self.head(url)
1258 0c6ab9df Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1259 0c6ab9df Sofia Papagiannaki
1260 0c6ab9df Sofia Papagiannaki
        # assert other has not been created under copy folder
1261 0c6ab9df Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1262 0c6ab9df Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
1263 0c6ab9df Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
1264 0c6ab9df Sofia Papagiannaki
        r = self.head(url)
1265 0c6ab9df Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1266 0c6ab9df Sofia Papagiannaki
1267 f53483d8 Sofia Papagiannaki
    def test_copy_to_other_account(self):
1268 f53483d8 Sofia Papagiannaki
        # create a container under alice account
1269 f53483d8 Sofia Papagiannaki
        cname = self.create_container(user='alice')[0]
1270 f53483d8 Sofia Papagiannaki
1271 f53483d8 Sofia Papagiannaki
        # create a folder under this container
1272 f53483d8 Sofia Papagiannaki
        folder = self.create_folder(cname, user='alice')[0]
1273 f53483d8 Sofia Papagiannaki
1274 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
1275 f53483d8 Sofia Papagiannaki
1276 f53483d8 Sofia Papagiannaki
        # copy object to other account container
1277 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1278 f53483d8 Sofia Papagiannaki
                        self.object)
1279 f53483d8 Sofia Papagiannaki
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1280 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1281 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1282 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1283 f53483d8 Sofia Papagiannaki
1284 f53483d8 Sofia Papagiannaki
        # share object for read with user
1285 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1286 f53483d8 Sofia Papagiannaki
        r = self.post(url, user='alice', content_type='',
1287 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1288 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1289 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1290 f53483d8 Sofia Papagiannaki
1291 f53483d8 Sofia Papagiannaki
        # assert copy object still is not allowed
1292 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1293 f53483d8 Sofia Papagiannaki
                        self.object)
1294 f53483d8 Sofia Papagiannaki
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1295 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1296 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1297 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1298 f53483d8 Sofia Papagiannaki
1299 f53483d8 Sofia Papagiannaki
        # share object for write with user
1300 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1301 f53483d8 Sofia Papagiannaki
        r = self.post(url, user='alice',  content_type='',
1302 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1303 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1304 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1305 f53483d8 Sofia Papagiannaki
1306 f53483d8 Sofia Papagiannaki
        # assert copy object now is allowed
1307 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1308 f53483d8 Sofia Papagiannaki
                        self.object)
1309 f53483d8 Sofia Papagiannaki
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1310 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1311 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1312 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1313 f53483d8 Sofia Papagiannaki
1314 f53483d8 Sofia Papagiannaki
        # assert access the new object
1315 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder, oname)
1316 f53483d8 Sofia Papagiannaki
        r = self.head(url, user='alice')
1317 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1318 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
1319 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1320 f53483d8 Sofia Papagiannaki
1321 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1322 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1323 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1324 f53483d8 Sofia Papagiannaki
1325 f53483d8 Sofia Papagiannaki
        # assert source object still exists
1326 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1327 f53483d8 Sofia Papagiannaki
                        self.object)
1328 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1329 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1330 f53483d8 Sofia Papagiannaki
1331 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1332 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1333 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1334 f53483d8 Sofia Papagiannaki
1335 f53483d8 Sofia Papagiannaki
        r = self.get(url)
1336 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1337 f53483d8 Sofia Papagiannaki
1338 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1339 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1340 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1341 f53483d8 Sofia Papagiannaki
1342 f53483d8 Sofia Papagiannaki
1343 f53483d8 Sofia Papagiannaki
class ObjectMove(PithosAPITest):
1344 f53483d8 Sofia Papagiannaki
    def setUp(self):
1345 f53483d8 Sofia Papagiannaki
        PithosAPITest.setUp(self)
1346 f53483d8 Sofia Papagiannaki
        self.container = 'c1'
1347 f53483d8 Sofia Papagiannaki
        self.create_container(self.container)
1348 f53483d8 Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
1349 f53483d8 Sofia Papagiannaki
1350 f53483d8 Sofia Papagiannaki
        url = join_urls(
1351 f53483d8 Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
1352 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1353 f53483d8 Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
1354 f53483d8 Sofia Papagiannaki
1355 f53483d8 Sofia Papagiannaki
    def test_move(self):
1356 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
1357 f53483d8 Sofia Papagiannaki
1358 f53483d8 Sofia Papagiannaki
        # move object
1359 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1360 f53483d8 Sofia Papagiannaki
                        self.object)
1361 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1362 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1363 f53483d8 Sofia Papagiannaki
                                                   oname))
1364 f53483d8 Sofia Papagiannaki
        # assert move success
1365 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1366 f53483d8 Sofia Papagiannaki
                        oname)
1367 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1368 f53483d8 Sofia Papagiannaki
1369 f53483d8 Sofia Papagiannaki
        # assert access the new object
1370 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1371 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1372 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
1373 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1374 f53483d8 Sofia Papagiannaki
1375 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1376 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1377 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1378 f53483d8 Sofia Papagiannaki
1379 f53483d8 Sofia Papagiannaki
        # assert source object does not exist
1380 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1381 f53483d8 Sofia Papagiannaki
                        self.object)
1382 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1383 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1384 f53483d8 Sofia Papagiannaki
1385 0c6ab9df Sofia Papagiannaki
    @pithos_test_settings(API_LIST_LIMIT=10)
1386 0c6ab9df Sofia Papagiannaki
    def test_move_dir_contents(self):
1387 0c6ab9df Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
1388 0c6ab9df Sofia Papagiannaki
        subfolder = self.create_folder(
1389 0c6ab9df Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1390 0c6ab9df Sofia Papagiannaki
        objects = [subfolder]
1391 0c6ab9df Sofia Papagiannaki
        append = objects.append
1392 0c6ab9df Sofia Papagiannaki
        for i in range(11):
1393 0c6ab9df Sofia Papagiannaki
            append(self.upload_object(self.container,
1394 0c6ab9df Sofia Papagiannaki
                                      '%s/%d' % (folder, i),
1395 0c6ab9df Sofia Papagiannaki
                                      depth='1')[0])
1396 0c6ab9df Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
1397 0c6ab9df Sofia Papagiannaki
1398 0c6ab9df Sofia Papagiannaki
        # copy dir
1399 0c6ab9df Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1400 0c6ab9df Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
1401 0c6ab9df Sofia Papagiannaki
        r = self.move('%s?delimiter=/' % url,
1402 0c6ab9df Sofia Papagiannaki
                      HTTP_X_OBJECT_META_TEST='testcopy',
1403 0c6ab9df Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1404 0c6ab9df Sofia Papagiannaki
                                                   copy_folder))
1405 0c6ab9df Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1406 0c6ab9df Sofia Papagiannaki
1407 0c6ab9df Sofia Papagiannaki
        for obj in objects:
1408 0c6ab9df Sofia Papagiannaki
            # assert object exists
1409 0c6ab9df Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1410 0c6ab9df Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
1411 0c6ab9df Sofia Papagiannaki
            r = self.head(url)
1412 0c6ab9df Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
1413 0c6ab9df Sofia Papagiannaki
1414 0c6ab9df Sofia Papagiannaki
        # assert other has not been created under copy folder
1415 0c6ab9df Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1416 0c6ab9df Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
1417 0c6ab9df Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
1418 0c6ab9df Sofia Papagiannaki
        r = self.head(url)
1419 0c6ab9df Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1420 0c6ab9df Sofia Papagiannaki
1421 f53483d8 Sofia Papagiannaki
    def test_move_to_other_container(self):
1422 f53483d8 Sofia Papagiannaki
        # move object to other container (not existing)
1423 f53483d8 Sofia Papagiannaki
        cname = get_random_name()
1424 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1425 f53483d8 Sofia Papagiannaki
                        self.object)
1426 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1427 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1428 f53483d8 Sofia Papagiannaki
1429 f53483d8 Sofia Papagiannaki
        # assert destination container does not exist
1430 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname,
1431 f53483d8 Sofia Papagiannaki
                        self.object)
1432 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1433 f53483d8 Sofia Papagiannaki
1434 f53483d8 Sofia Papagiannaki
        # create container
1435 f53483d8 Sofia Papagiannaki
        self.create_container(cname)
1436 f53483d8 Sofia Papagiannaki
1437 f53483d8 Sofia Papagiannaki
        # move object to other container (existing)
1438 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1439 f53483d8 Sofia Papagiannaki
                        self.object)
1440 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1441 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1442 f53483d8 Sofia Papagiannaki
1443 f53483d8 Sofia Papagiannaki
        # assert move success
1444 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname,
1445 f53483d8 Sofia Papagiannaki
                        self.object)
1446 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1447 f53483d8 Sofia Papagiannaki
1448 f53483d8 Sofia Papagiannaki
        # assert access the new object
1449 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1450 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1451 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
1452 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1453 f53483d8 Sofia Papagiannaki
1454 f53483d8 Sofia Papagiannaki
        # assert etag is the same
1455 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
1456 f53483d8 Sofia Papagiannaki
        self.assertEqual(r['X-Object-Hash'], self.etag)
1457 f53483d8 Sofia Papagiannaki
1458 f53483d8 Sofia Papagiannaki
        # assert source object does not exist
1459 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1460 f53483d8 Sofia Papagiannaki
                        self.object)
1461 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1462 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1463 f53483d8 Sofia Papagiannaki
1464 f53483d8 Sofia Papagiannaki
    def test_move_to_other_account(self):
1465 f53483d8 Sofia Papagiannaki
        # create a container under alice account
1466 f53483d8 Sofia Papagiannaki
        cname = self.create_container(user='alice')[0]
1467 f53483d8 Sofia Papagiannaki
1468 f53483d8 Sofia Papagiannaki
        # create a folder under this container
1469 f53483d8 Sofia Papagiannaki
        folder = self.create_folder(cname, user='alice')[0]
1470 f53483d8 Sofia Papagiannaki
1471 f53483d8 Sofia Papagiannaki
        oname = get_random_name()
1472 f53483d8 Sofia Papagiannaki
1473 f53483d8 Sofia Papagiannaki
        # move object to other account container
1474 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1475 f53483d8 Sofia Papagiannaki
                        self.object)
1476 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1477 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1478 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1479 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1480 f53483d8 Sofia Papagiannaki
1481 f53483d8 Sofia Papagiannaki
        # share object for read with user
1482 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1483 f53483d8 Sofia Papagiannaki
        r = self.post(url, user='alice', content_type='',
1484 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1485 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1486 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1487 f53483d8 Sofia Papagiannaki
1488 f53483d8 Sofia Papagiannaki
        # assert move object still is not allowed
1489 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1490 f53483d8 Sofia Papagiannaki
                        self.object)
1491 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1492 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1493 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1494 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 403)
1495 f53483d8 Sofia Papagiannaki
1496 f53483d8 Sofia Papagiannaki
        # share object for write with user
1497 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1498 f53483d8 Sofia Papagiannaki
        r = self.post(url, user='alice',  content_type='',
1499 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1500 f53483d8 Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1501 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
1502 f53483d8 Sofia Papagiannaki
1503 f53483d8 Sofia Papagiannaki
        # assert move object now is allowed
1504 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1505 f53483d8 Sofia Papagiannaki
                        self.object)
1506 f53483d8 Sofia Papagiannaki
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1507 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1508 f53483d8 Sofia Papagiannaki
                      HTTP_DESTINATION_ACCOUNT='alice')
1509 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1510 f53483d8 Sofia Papagiannaki
1511 f53483d8 Sofia Papagiannaki
        # assert source object does not exist
1512 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1513 f53483d8 Sofia Papagiannaki
                        self.object)
1514 f53483d8 Sofia Papagiannaki
        r = self.head(url)
1515 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1516 f53483d8 Sofia Papagiannaki
1517 3a19e99b Sofia Papagiannaki
1518 3a19e99b Sofia Papagiannaki
class ObjectPost(PithosAPITest):
1519 3a19e99b Sofia Papagiannaki
    def setUp(self):
1520 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
1521 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
1522 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
1523 3a19e99b Sofia Papagiannaki
        self.object, self.object_data = self.upload_object(self.container)[:2]
1524 3a19e99b Sofia Papagiannaki
1525 3a19e99b Sofia Papagiannaki
    def test_update_meta(self):
1526 3a19e99b Sofia Papagiannaki
        with AssertUUidInvariant(self.get_object_info,
1527 3a19e99b Sofia Papagiannaki
                                 self.container,
1528 3a19e99b Sofia Papagiannaki
                                 self.object):
1529 3a19e99b Sofia Papagiannaki
            # update metadata
1530 3a19e99b Sofia Papagiannaki
            d = {'a' * 114: 'b' * 256}
1531 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1532 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
1533 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
1534 3a19e99b Sofia Papagiannaki
                            self.object)
1535 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
1536 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
1537 3a19e99b Sofia Papagiannaki
1538 3a19e99b Sofia Papagiannaki
            # assert metadata have been updated
1539 3a19e99b Sofia Papagiannaki
            meta = self.get_object_meta(self.container, self.object)
1540 3a19e99b Sofia Papagiannaki
1541 3a19e99b Sofia Papagiannaki
            for k, v in d.items():
1542 6ee6677e Sofia Papagiannaki
                self.assertTrue(k.title() in meta)
1543 6ee6677e Sofia Papagiannaki
                self.assertTrue(meta[k.title()], v)
1544 3a19e99b Sofia Papagiannaki
1545 3a19e99b Sofia Papagiannaki
            # Header key too large
1546 3a19e99b Sofia Papagiannaki
            d = {'a' * 115: 'b' * 256}
1547 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1548 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
1549 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
1550 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 400)
1551 3a19e99b Sofia Papagiannaki
1552 3a19e99b Sofia Papagiannaki
            # Header value too large
1553 3a19e99b Sofia Papagiannaki
            d = {'a' * 114: 'b' * 257}
1554 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1555 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
1556 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
1557 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 400)
1558 3a19e99b Sofia Papagiannaki
1559 3a19e99b Sofia Papagiannaki
#            # Check utf-8 meta
1560 3a19e99b Sofia Papagiannaki
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
1561 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1562 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
1563 3a19e99b Sofia Papagiannaki
#            url = join_urls(self.pithos_path, self.user, self.container,
1564 3a19e99b Sofia Papagiannaki
#                            self.object)
1565 3a19e99b Sofia Papagiannaki
#            r = self.post(url, content_type='', **kwargs)
1566 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 202)
1567 3a19e99b Sofia Papagiannaki
#
1568 3a19e99b Sofia Papagiannaki
#            # assert metadata have been updated
1569 3a19e99b Sofia Papagiannaki
#            meta = self.get_object_meta(self.container, self.object)
1570 3a19e99b Sofia Papagiannaki
#
1571 3a19e99b Sofia Papagiannaki
#            for k, v in d.items():
1572 3a19e99b Sofia Papagiannaki
#                key = 'X-Object-Meta-%s' % k.title()
1573 3a19e99b Sofia Papagiannaki
#                self.assertTrue(key in meta)
1574 3a19e99b Sofia Papagiannaki
#                self.assertTrue(meta[key], v)
1575 3a19e99b Sofia Papagiannaki
#
1576 3a19e99b Sofia Papagiannaki
#            # Header key too large
1577 3a19e99b Sofia Papagiannaki
#            d = {'α' * 114: 'β' * (256 / 2)}
1578 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1579 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
1580 3a19e99b Sofia Papagiannaki
#            r = self.post(url, content_type='', **kwargs)
1581 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 400)
1582 3a19e99b Sofia Papagiannaki
#
1583 3a19e99b Sofia Papagiannaki
#            # Header value too large
1584 3a19e99b Sofia Papagiannaki
#            d = {'α' * 114: 'β' * 256}
1585 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1586 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
1587 3a19e99b Sofia Papagiannaki
#            r = self.udpate(url, content_type='', **kwargs)
1588 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 400)
1589 3a19e99b Sofia Papagiannaki
1590 3a19e99b Sofia Papagiannaki
    def test_update_object(self):
1591 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1592 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1593 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(
1594 42c9ed6e Sofia Papagiannaki
                block_size + 2, 2 * block_size))[:2]
1595 3a19e99b Sofia Papagiannaki
1596 3a19e99b Sofia Papagiannaki
        length = len(odata)
1597 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1598 3a19e99b Sofia Papagiannaki
        last_byte_pos = random.randint(block_size + 1, length - 1)
1599 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1600 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1601 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1602 3a19e99b Sofia Papagiannaki
1603 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1604 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1605 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1606 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1607 3a19e99b Sofia Papagiannaki
1608 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1609 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
1610 3a19e99b Sofia Papagiannaki
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1611 3a19e99b Sofia Papagiannaki
                                     data)
1612 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
1613 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(updated_data)
1614 3a19e99b Sofia Papagiannaki
        else:
1615 3a19e99b Sofia Papagiannaki
            etag = merkle(updated_data)
1616 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r['ETag'], etag)
1617 3a19e99b Sofia Papagiannaki
1618 3a19e99b Sofia Papagiannaki
        # check modified object
1619 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1620 3a19e99b Sofia Papagiannaki
1621 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1622 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, updated_data)
1623 3a19e99b Sofia Papagiannaki
        self.assertEqual(etag, r['ETag'])
1624 3a19e99b Sofia Papagiannaki
1625 3a19e99b Sofia Papagiannaki
    def test_update_object_divided_by_blocksize(self):
1626 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1627 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(self.container,
1628 3a19e99b Sofia Papagiannaki
                                          length=2 * block_size)[:2]
1629 3a19e99b Sofia Papagiannaki
1630 3a19e99b Sofia Papagiannaki
        length = len(odata)
1631 3a19e99b Sofia Papagiannaki
        first_byte_pos = block_size
1632 3a19e99b Sofia Papagiannaki
        last_byte_pos = 2 * block_size - 1
1633 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1634 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1635 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1636 3a19e99b Sofia Papagiannaki
1637 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1638 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1639 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1640 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1641 3a19e99b Sofia Papagiannaki
1642 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1643 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
1644 3a19e99b Sofia Papagiannaki
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1645 3a19e99b Sofia Papagiannaki
                                     data)
1646 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
1647 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(updated_data)
1648 3a19e99b Sofia Papagiannaki
        else:
1649 3a19e99b Sofia Papagiannaki
            etag = merkle(updated_data)
1650 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r['ETag'], etag)
1651 3a19e99b Sofia Papagiannaki
1652 3a19e99b Sofia Papagiannaki
        # check modified object
1653 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1654 3a19e99b Sofia Papagiannaki
1655 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1656 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, updated_data)
1657 3a19e99b Sofia Papagiannaki
        self.assertEqual(etag, r['ETag'])
1658 3a19e99b Sofia Papagiannaki
1659 3a19e99b Sofia Papagiannaki
    def test_update_object_invalid_content_length(self):
1660 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1661 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1662 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(
1663 3a19e99b Sofia Papagiannaki
                block_size + 1, 2 * block_size))[:2]
1664 3a19e99b Sofia Papagiannaki
1665 3a19e99b Sofia Papagiannaki
        length = len(odata)
1666 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1667 3a19e99b Sofia Papagiannaki
        last_byte_pos = random.randint(block_size + 1, length - 1)
1668 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1669 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1670 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1671 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1672 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range,
1673 6ee6677e Sofia Papagiannaki
                  'CONTENT_LENGTH': str(partial + 1)}
1674 3a19e99b Sofia Papagiannaki
1675 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1676 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1677 3a19e99b Sofia Papagiannaki
1678 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1679 3a19e99b Sofia Papagiannaki
1680 3a19e99b Sofia Papagiannaki
    def test_update_object_invalid_range(self):
1681 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1682 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1683 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(block_size + 1,
1684 3a19e99b Sofia Papagiannaki
                                                  2 * block_size))[:2]
1685 3a19e99b Sofia Papagiannaki
1686 3a19e99b Sofia Papagiannaki
        length = len(odata)
1687 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1688 3a19e99b Sofia Papagiannaki
        last_byte_pos = first_byte_pos - 1
1689 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1690 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1691 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1692 3a19e99b Sofia Papagiannaki
1693 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1694 5fe43b8c Sofia Papagiannaki
        r = self.post(url, data=get_random_data(), **kwargs)
1695 3a19e99b Sofia Papagiannaki
1696 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
1697 3a19e99b Sofia Papagiannaki
1698 3a19e99b Sofia Papagiannaki
    def test_update_object_out_of_limits(self):
1699 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1700 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1701 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(block_size + 1,
1702 3a19e99b Sofia Papagiannaki
                                                  2 * block_size))[:2]
1703 3a19e99b Sofia Papagiannaki
1704 3a19e99b Sofia Papagiannaki
        length = len(odata)
1705 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1706 3a19e99b Sofia Papagiannaki
        last_byte_pos = length + 1
1707 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1708 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1709 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1710 3a19e99b Sofia Papagiannaki
1711 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1712 5fe43b8c Sofia Papagiannaki
        r = self.post(url, data=get_random_data(), **kwargs)
1713 3a19e99b Sofia Papagiannaki
1714 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
1715 3a19e99b Sofia Papagiannaki
1716 3a19e99b Sofia Papagiannaki
    def test_append(self):
1717 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
1718 5fe43b8c Sofia Papagiannaki
        length = len(data)
1719 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1720 3a19e99b Sofia Papagiannaki
                        self.object)
1721 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, content_type='application/octet-stream',
1722 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_LENGTH=str(length),
1723 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*')
1724 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1725 3a19e99b Sofia Papagiannaki
1726 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1727 3a19e99b Sofia Papagiannaki
        content = r.content
1728 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(content), len(self.object_data) + length)
1729 3a19e99b Sofia Papagiannaki
        self.assertEqual(content, self.object_data + data)
1730 3a19e99b Sofia Papagiannaki
1731 5fe43b8c Sofia Papagiannaki
    # TODO Fix the test
1732 5fe43b8c Sofia Papagiannaki
    def _test_update_with_chunked_transfer(self):
1733 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
1734 5fe43b8c Sofia Papagiannaki
        length = len(data)
1735 3a19e99b Sofia Papagiannaki
1736 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1737 3a19e99b Sofia Papagiannaki
                        self.object)
1738 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, content_type='application/octet-stream',
1739 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1740 3a19e99b Sofia Papagiannaki
                      HTTP_TRANSFER_ENCODING='chunked')
1741 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1742 3a19e99b Sofia Papagiannaki
1743 3a19e99b Sofia Papagiannaki
        # check modified object
1744 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1745 3a19e99b Sofia Papagiannaki
        content = r.content
1746 3a19e99b Sofia Papagiannaki
        self.assertEqual(content[0:length], data)
1747 3a19e99b Sofia Papagiannaki
        self.assertEqual(content[length:], self.object_data[length:])
1748 3a19e99b Sofia Papagiannaki
1749 3a19e99b Sofia Papagiannaki
    def test_update_from_other_object(self):
1750 3a19e99b Sofia Papagiannaki
        src = self.object
1751 bc4f25c0 Sofia Papagiannaki
        dest = get_random_name()
1752 3a19e99b Sofia Papagiannaki
1753 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1754 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1755 3a19e99b Sofia Papagiannaki
        source_data = r.content
1756 3a19e99b Sofia Papagiannaki
        source_meta = self.get_object_info(self.container, src)
1757 3a19e99b Sofia Papagiannaki
1758 3a19e99b Sofia Papagiannaki
        # update zero length object
1759 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1760 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='')
1761 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1762 3a19e99b Sofia Papagiannaki
1763 3a19e99b Sofia Papagiannaki
        r = self.post(url,
1764 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1765 3a19e99b Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1766 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1767 3a19e99b Sofia Papagiannaki
1768 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1769 3a19e99b Sofia Papagiannaki
        dest_data = r.content
1770 3a19e99b Sofia Papagiannaki
        dest_meta = self.get_object_info(self.container, dest)
1771 3a19e99b Sofia Papagiannaki
1772 3a19e99b Sofia Papagiannaki
        self.assertEqual(source_data, dest_data)
1773 3a19e99b Sofia Papagiannaki
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1774 3a19e99b Sofia Papagiannaki
        self.assertEqual(source_meta['X-Object-Hash'],
1775 3a19e99b Sofia Papagiannaki
                         dest_meta['X-Object-Hash'])
1776 3a19e99b Sofia Papagiannaki
        self.assertTrue(
1777 3a19e99b Sofia Papagiannaki
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1778 3a19e99b Sofia Papagiannaki
1779 3a19e99b Sofia Papagiannaki
    def test_update_range_from_other_object(self):
1780 3a19e99b Sofia Papagiannaki
        src = self.object
1781 bc4f25c0 Sofia Papagiannaki
        dest = get_random_name()
1782 3a19e99b Sofia Papagiannaki
1783 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1784 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1785 3a19e99b Sofia Papagiannaki
        source_data = r.content
1786 f4b0dbd8 Sofia Papagiannaki
        source_length = len(source_data)
1787 3a19e99b Sofia Papagiannaki
1788 3a19e99b Sofia Papagiannaki
        # update zero length object
1789 c977b0b6 Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1790 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1791 c977b0b6 Sofia Papagiannaki
        initial_data = get_random_data(
1792 c977b0b6 Sofia Papagiannaki
            length=source_length + random.randint(0, block_size))
1793 c977b0b6 Sofia Papagiannaki
1794 a1f429b2 Sofia Papagiannaki
        r = self.put(url, data=initial_data)
1795 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1796 3a19e99b Sofia Papagiannaki
1797 f4b0dbd8 Sofia Papagiannaki
        offset = random.randint(0, source_length - 1)
1798 f4b0dbd8 Sofia Papagiannaki
        upto = random.randint(offset, source_length - 1)
1799 3a19e99b Sofia Papagiannaki
        r = self.post(url,
1800 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1801 3a19e99b Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1802 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1803 3a19e99b Sofia Papagiannaki
1804 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1805 3a19e99b Sofia Papagiannaki
        content = r.content
1806 a1f429b2 Sofia Papagiannaki
        self.assertEqual(content, (initial_data[:offset] +
1807 a1f429b2 Sofia Papagiannaki
                                   source_data[:upto - offset + 1] +
1808 a1f429b2 Sofia Papagiannaki
                                   initial_data[upto + 1:]))
1809 3a19e99b Sofia Papagiannaki
1810 e4c7cbeb Sofia Papagiannaki
    def test_update_range_from_invalid_other_object(self):
1811 e4c7cbeb Sofia Papagiannaki
        src = self.object
1812 e4c7cbeb Sofia Papagiannaki
        dest = get_random_name()
1813 e4c7cbeb Sofia Papagiannaki
1814 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1815 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1816 e4c7cbeb Sofia Papagiannaki
1817 e4c7cbeb Sofia Papagiannaki
        # update zero length object
1818 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1819 e4c7cbeb Sofia Papagiannaki
        initial_data = get_random_data()
1820 e4c7cbeb Sofia Papagiannaki
        length = len(initial_data)
1821 e4c7cbeb Sofia Papagiannaki
        r = self.put(url, data=initial_data)
1822 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1823 e4c7cbeb Sofia Papagiannaki
1824 e4c7cbeb Sofia Papagiannaki
        offset = random.randint(1, length - 2)
1825 e4c7cbeb Sofia Papagiannaki
        upto = random.randint(offset, length - 1)
1826 e4c7cbeb Sofia Papagiannaki
1827 e4c7cbeb Sofia Papagiannaki
        # source object does not start with /
1828 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1829 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1830 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1831 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1832 e4c7cbeb Sofia Papagiannaki
1833 e4c7cbeb Sofia Papagiannaki
        # source object does not exist
1834 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1835 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1836 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1837 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1838 e4c7cbeb Sofia Papagiannaki
1839 00064fb8 Sofia Papagiannaki
    def test_restore_version(self):
1840 e4c7cbeb Sofia Papagiannaki
        info = self.get_object_info(self.container, self.object)
1841 00064fb8 Sofia Papagiannaki
        v = []
1842 00064fb8 Sofia Papagiannaki
        append = v.append
1843 00064fb8 Sofia Papagiannaki
        append((info['X-Object-Version'],
1844 00064fb8 Sofia Papagiannaki
                int(info['Content-Length']),
1845 00064fb8 Sofia Papagiannaki
                self.object_data))
1846 e4c7cbeb Sofia Papagiannaki
1847 e4c7cbeb Sofia Papagiannaki
        # update object
1848 00064fb8 Sofia Papagiannaki
        data, r = self.upload_object(self.container, self.object,
1849 00064fb8 Sofia Papagiannaki
                                     length=v[0][1] - 1)[1:]
1850 e4c7cbeb Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1851 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1852 00064fb8 Sofia Papagiannaki
        # v[0][1] > v[1][1]
1853 e4c7cbeb Sofia Papagiannaki
1854 00064fb8 Sofia Papagiannaki
        # update with the previous version
1855 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1856 e4c7cbeb Sofia Papagiannaki
                        self.object)
1857 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1858 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1859 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1860 e4c7cbeb Sofia Papagiannaki
                                                       self.object),
1861 00064fb8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=v[0][0],
1862 00064fb8 Sofia Papagiannaki
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1863 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1864 00064fb8 Sofia Papagiannaki
        # v[2][1] = v[0][1] > v[1][1]
1865 e4c7cbeb Sofia Papagiannaki
1866 e4c7cbeb Sofia Papagiannaki
        # check content
1867 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1868 e4c7cbeb Sofia Papagiannaki
        content = r.content
1869 00064fb8 Sofia Papagiannaki
        self.assertEqual(len(content), v[0][1])
1870 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(content, self.object_data)
1871 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(content), content))
1872 e4c7cbeb Sofia Papagiannaki
1873 00064fb8 Sofia Papagiannaki
        # update object content(v4) > content(v2)
1874 00064fb8 Sofia Papagiannaki
        data, r = self.upload_object(self.container, self.object,
1875 00064fb8 Sofia Papagiannaki
                                     length=v[2][1] + 1)[1:]
1876 e4c7cbeb Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1877 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1878 00064fb8 Sofia Papagiannaki
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1879 e4c7cbeb Sofia Papagiannaki
1880 e4c7cbeb Sofia Papagiannaki
        # update with the previous version
1881 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1882 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1883 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1884 e4c7cbeb Sofia Papagiannaki
                                                       self.object),
1885 00064fb8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=v[2][0],
1886 00064fb8 Sofia Papagiannaki
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1887 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1888 00064fb8 Sofia Papagiannaki
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1889 e4c7cbeb Sofia Papagiannaki
1890 e4c7cbeb Sofia Papagiannaki
        # check content
1891 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1892 00064fb8 Sofia Papagiannaki
        data = r.content
1893 00064fb8 Sofia Papagiannaki
        self.assertEqual(data, v[2][2])
1894 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1895 e4c7cbeb Sofia Papagiannaki
1896 f53483d8 Sofia Papagiannaki
    def test_update_from_other_version(self):
1897 f53483d8 Sofia Papagiannaki
        versions = []
1898 f53483d8 Sofia Papagiannaki
        info = self.get_object_info(self.container, self.object)
1899 f53483d8 Sofia Papagiannaki
        versions.append(info['X-Object-Version'])
1900 f53483d8 Sofia Papagiannaki
        pre_length = int(info['Content-Length'])
1901 f53483d8 Sofia Papagiannaki
1902 f53483d8 Sofia Papagiannaki
        # update object
1903 f53483d8 Sofia Papagiannaki
        d1, r = self.upload_object(self.container, self.object,
1904 f53483d8 Sofia Papagiannaki
                                   length=pre_length - 1)[1:]
1905 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1906 f53483d8 Sofia Papagiannaki
        versions.append(r['X-Object-Version'])
1907 f53483d8 Sofia Papagiannaki
1908 f53483d8 Sofia Papagiannaki
        # update object
1909 f53483d8 Sofia Papagiannaki
        d2, r = self.upload_object(self.container, self.object,
1910 f53483d8 Sofia Papagiannaki
                                   length=pre_length - 2)[1:]
1911 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1912 f53483d8 Sofia Papagiannaki
        versions.append(r['X-Object-Version'])
1913 f53483d8 Sofia Papagiannaki
1914 f53483d8 Sofia Papagiannaki
        # get previous version
1915 f53483d8 Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1916 f53483d8 Sofia Papagiannaki
                        self.object)
1917 f53483d8 Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
1918 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1919 f53483d8 Sofia Papagiannaki
        l = json.loads(r.content)['versions']
1920 f53483d8 Sofia Papagiannaki
        self.assertEqual(len(l), 3)
1921 f53483d8 Sofia Papagiannaki
        self.assertEqual([str(v[0]) for v in l], versions)
1922 f53483d8 Sofia Papagiannaki
1923 f53483d8 Sofia Papagiannaki
        # update with the previous version
1924 f53483d8 Sofia Papagiannaki
        r = self.post(url,
1925 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1926 f53483d8 Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1927 f53483d8 Sofia Papagiannaki
                                                       self.object),
1928 f53483d8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=versions[0])
1929 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1930 f53483d8 Sofia Papagiannaki
1931 f53483d8 Sofia Papagiannaki
        # check content
1932 f53483d8 Sofia Papagiannaki
        r = self.get(url)
1933 f53483d8 Sofia Papagiannaki
        content = r.content
1934 f53483d8 Sofia Papagiannaki
        self.assertEqual(len(content), pre_length)
1935 f53483d8 Sofia Papagiannaki
        self.assertEqual(content, self.object_data)
1936 f53483d8 Sofia Papagiannaki
1937 f53483d8 Sofia Papagiannaki
        # update object
1938 f53483d8 Sofia Papagiannaki
        d3, r = self.upload_object(self.container, self.object,
1939 f53483d8 Sofia Papagiannaki
                                   length=len(d2) + 1)[1:]
1940 f53483d8 Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1941 f53483d8 Sofia Papagiannaki
        versions.append(r['X-Object-Version'])
1942 f53483d8 Sofia Papagiannaki
1943 f53483d8 Sofia Papagiannaki
        # update with the previous version
1944 f53483d8 Sofia Papagiannaki
        r = self.post(url,
1945 f53483d8 Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1946 f53483d8 Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1947 f53483d8 Sofia Papagiannaki
                                                       self.object),
1948 f53483d8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=versions[-2])
1949 f53483d8 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1950 f53483d8 Sofia Papagiannaki
1951 f53483d8 Sofia Papagiannaki
        # check content
1952 f53483d8 Sofia Papagiannaki
        r = self.get(url)
1953 f53483d8 Sofia Papagiannaki
        content = r.content
1954 f53483d8 Sofia Papagiannaki
        self.assertEqual(content, d2 + d3[-1])
1955 f53483d8 Sofia Papagiannaki
1956 6c9df07c Sofia Papagiannaki
    def test_update_invalid_permissions(self):
1957 6c9df07c Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1958 6c9df07c Sofia Papagiannaki
                        self.object)
1959 6c9df07c Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1960 6c9df07c Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='%s' % (257*'a'))
1961 6c9df07c Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1962 6c9df07c Sofia Papagiannaki
1963 6c9df07c Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1964 6c9df07c Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='read=%s' % (257*'a'))
1965 6c9df07c Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1966 6c9df07c Sofia Papagiannaki
1967 6c9df07c Sofia Papagiannaki
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1968 6c9df07c Sofia Papagiannaki
                      HTTP_X_OBJECT_SHARING='write=%s' % (257*'a'))
1969 6c9df07c Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1970 3a19e99b Sofia Papagiannaki
1971 f019f93d Ilias Tsitsimpis
1972 3a19e99b Sofia Papagiannaki
class ObjectDelete(PithosAPITest):
1973 3a19e99b Sofia Papagiannaki
    def setUp(self):
1974 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
1975 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
1976 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
1977 3a19e99b Sofia Papagiannaki
        self.object, self.object_data = self.upload_object(self.container)[:2]
1978 3a19e99b Sofia Papagiannaki
1979 3a19e99b Sofia Papagiannaki
    def test_delete(self):
1980 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1981 3a19e99b Sofia Papagiannaki
                        self.object)
1982 3a19e99b Sofia Papagiannaki
        r = self.delete(url)
1983 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1984 3a19e99b Sofia Papagiannaki
1985 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1986 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1987 3a19e99b Sofia Papagiannaki
1988 3a19e99b Sofia Papagiannaki
    def test_delete_non_existent(self):
1989 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1990 bc4f25c0 Sofia Papagiannaki
                        get_random_name())
1991 3a19e99b Sofia Papagiannaki
        r = self.delete(url)
1992 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1993 3a19e99b Sofia Papagiannaki
1994 0c6ab9df Sofia Papagiannaki
    @pithos_test_settings(API_LIST_LIMIT=10)
1995 3a19e99b Sofia Papagiannaki
    def test_delete_dir(self):
1996 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
1997 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
1998 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1999 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
2000 3a19e99b Sofia Papagiannaki
        append = objects.append
2001 0c6ab9df Sofia Papagiannaki
        for i in range(11):
2002 0c6ab9df Sofia Papagiannaki
            append(self.upload_object(self.container,
2003 0c6ab9df Sofia Papagiannaki
                                      '%s/%d' % (folder, i),
2004 0c6ab9df Sofia Papagiannaki
                                      depth='1')[0])
2005 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
2006 3a19e99b Sofia Papagiannaki
2007 3a19e99b Sofia Papagiannaki
        # move dir
2008 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, folder)
2009 3a19e99b Sofia Papagiannaki
        r = self.delete('%s?delimiter=/' % url)
2010 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
2011 3a19e99b Sofia Papagiannaki
2012 3a19e99b Sofia Papagiannaki
        for obj in objects:
2013 3a19e99b Sofia Papagiannaki
            # assert object does not exist
2014 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, obj)
2015 3a19e99b Sofia Papagiannaki
            r = self.head(url)
2016 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
2017 3a19e99b Sofia Papagiannaki
2018 3a19e99b Sofia Papagiannaki
        # assert other has not been deleted
2019 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, other)
2020 3a19e99b Sofia Papagiannaki
        r = self.head(url)
2021 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)