Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 800af189

History | View | Annotate | Download (45.1 kB)

1 3a19e99b Sofia Papagiannaki
#!/usr/bin/env python
2 3a19e99b Sofia Papagiannaki
#coding=utf8
3 3a19e99b Sofia Papagiannaki
4 3a19e99b Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 3a19e99b Sofia Papagiannaki
#
6 3a19e99b Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 3a19e99b Sofia Papagiannaki
# without modification, are permitted provided that the following
8 3a19e99b Sofia Papagiannaki
# conditions are met:
9 3a19e99b Sofia Papagiannaki
#
10 3a19e99b Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 3a19e99b Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 3a19e99b Sofia Papagiannaki
#      disclaimer.
13 3a19e99b Sofia Papagiannaki
#
14 3a19e99b Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 3a19e99b Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 3a19e99b Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 3a19e99b Sofia Papagiannaki
#      provided with the distribution.
18 3a19e99b Sofia Papagiannaki
#
19 3a19e99b Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 3a19e99b Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 3a19e99b Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 3a19e99b Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 3a19e99b Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 3a19e99b Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 3a19e99b Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 3a19e99b Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 3a19e99b Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 3a19e99b Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 3a19e99b Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 3a19e99b Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 3a19e99b Sofia Papagiannaki
#
32 3a19e99b Sofia Papagiannaki
# The views and conclusions contained in the software and
33 3a19e99b Sofia Papagiannaki
# documentation are those of the authors and should not be
34 3a19e99b Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 3a19e99b Sofia Papagiannaki
# or implied, of GRNET S.A.
36 3a19e99b Sofia Papagiannaki
37 3a19e99b Sofia Papagiannaki
from collections import defaultdict
38 3a19e99b Sofia Papagiannaki
from urllib import quote
39 5fe43b8c Sofia Papagiannaki
from functools import partial
40 3a19e99b Sofia Papagiannaki
41 3a19e99b Sofia Papagiannaki
from pithos.api.test import (PithosAPITest, pithos_settings,
42 3a19e99b Sofia Papagiannaki
                             AssertMappingInvariant, AssertUUidInvariant,
43 5fe43b8c Sofia Papagiannaki
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44 3a19e99b Sofia Papagiannaki
                             DATE_FORMATS)
45 5fe43b8c Sofia Papagiannaki
from pithos.api.test.util import md5_hash, merkle, strnextling, get_random_data
46 3a19e99b Sofia Papagiannaki
47 3a19e99b Sofia Papagiannaki
from synnefo.lib import join_urls
48 3a19e99b Sofia Papagiannaki
49 3a19e99b Sofia Papagiannaki
import django.utils.simplejson as json
50 3a19e99b Sofia Papagiannaki
51 3a19e99b Sofia Papagiannaki
import random
52 3a19e99b Sofia Papagiannaki
import re
53 3a19e99b Sofia Papagiannaki
import datetime
54 5fe43b8c Sofia Papagiannaki
import time as _time
55 5fe43b8c Sofia Papagiannaki
56 5fe43b8c Sofia Papagiannaki
merkle = partial(merkle,
57 5fe43b8c Sofia Papagiannaki
                 blocksize=TEST_BLOCK_SIZE,
58 5fe43b8c Sofia Papagiannaki
                 blockhash=TEST_HASH_ALGORITHM)
59 3a19e99b Sofia Papagiannaki
60 3a19e99b Sofia Papagiannaki
61 3a19e99b Sofia Papagiannaki
class ObjectGet(PithosAPITest):
62 3a19e99b Sofia Papagiannaki
    def setUp(self):
63 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
64 3a19e99b Sofia Papagiannaki
        self.containers = ['c1', 'c2']
65 3a19e99b Sofia Papagiannaki
66 3a19e99b Sofia Papagiannaki
        # create some containers
67 3a19e99b Sofia Papagiannaki
        for c in self.containers:
68 3a19e99b Sofia Papagiannaki
            self.create_container(c)
69 3a19e99b Sofia Papagiannaki
70 3a19e99b Sofia Papagiannaki
        # upload files
71 3a19e99b Sofia Papagiannaki
        self.objects = defaultdict(list)
72 3a19e99b Sofia Papagiannaki
        self.objects['c1'].append(self.upload_object('c1')[0])
73 3a19e99b Sofia Papagiannaki
74 3a19e99b Sofia Papagiannaki
    def test_versions(self):
75 3a19e99b Sofia Papagiannaki
        c = 'c1'
76 3a19e99b Sofia Papagiannaki
        o = self.objects[c][0]
77 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
78 3a19e99b Sofia Papagiannaki
79 3a19e99b Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
80 3a19e99b Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
81 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
82 3a19e99b Sofia Papagiannaki
83 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
84 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
85 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
86 3a19e99b Sofia Papagiannaki
        l1 = json.loads(r.content)['versions']
87 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l1), 2)
88 3a19e99b Sofia Papagiannaki
89 3a19e99b Sofia Papagiannaki
        # update meta
90 3a19e99b Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
91 3a19e99b Sofia Papagiannaki
                'HTTP_X_OBJECT_META_STOCK': 'True'}
92 3a19e99b Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
93 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
94 3a19e99b Sofia Papagiannaki
95 3a19e99b Sofia Papagiannaki
        # assert a newly created version has been created
96 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
97 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
98 3a19e99b Sofia Papagiannaki
        l2 = json.loads(r.content)['versions']
99 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l2), len(l1) + 1)
100 3a19e99b Sofia Papagiannaki
        self.assertEqual(l2[:-1], l1)
101 3a19e99b Sofia Papagiannaki
102 3a19e99b Sofia Papagiannaki
        vserial, _ = l2[-2]
103 3a19e99b Sofia Papagiannaki
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
104 3a19e99b Sofia Papagiannaki
                         {'X-Object-Meta-Quality': 'AAA'})
105 3a19e99b Sofia Papagiannaki
106 3a19e99b Sofia Papagiannaki
        # update data
107 3a19e99b Sofia Papagiannaki
        self.append_object_data(c, o)
108 3a19e99b Sofia Papagiannaki
109 3a19e99b Sofia Papagiannaki
        # assert a newly created version has been created
110 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
111 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
112 3a19e99b Sofia Papagiannaki
        l3 = json.loads(r.content)['versions']
113 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l3), len(l2) + 1)
114 3a19e99b Sofia Papagiannaki
        self.assertEqual(l3[:-1], l2)
115 3a19e99b Sofia Papagiannaki
116 3a19e99b Sofia Papagiannaki
    def test_objects_with_trailing_spaces(self):
117 3a19e99b Sofia Papagiannaki
        # create object
118 3a19e99b Sofia Papagiannaki
        oname = self.upload_object('c1')[0]
119 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
120 3a19e99b Sofia Papagiannaki
121 3a19e99b Sofia Papagiannaki
        r = self.get(quote('%s ' % url))
122 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
123 3a19e99b Sofia Papagiannaki
124 3a19e99b Sofia Papagiannaki
        # delete object
125 3a19e99b Sofia Papagiannaki
        self.delete(url)
126 3a19e99b Sofia Papagiannaki
127 3a19e99b Sofia Papagiannaki
        r = self.get(url)
128 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
129 3a19e99b Sofia Papagiannaki
130 3a19e99b Sofia Papagiannaki
        # upload object with trailing space
131 3a19e99b Sofia Papagiannaki
        oname = self.upload_object('c1', quote('%s ' % get_random_data(8)))[0]
132 3a19e99b Sofia Papagiannaki
133 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
134 3a19e99b Sofia Papagiannaki
        r = self.get(url)
135 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
136 3a19e99b Sofia Papagiannaki
137 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
138 3a19e99b Sofia Papagiannaki
        r = self.get(url)
139 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
140 3a19e99b Sofia Papagiannaki
141 3a19e99b Sofia Papagiannaki
    def test_get_partial(self):
142 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
143 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
144 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
145 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=0-499')
146 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
147 3a19e99b Sofia Papagiannaki
        data = r.content
148 3a19e99b Sofia Papagiannaki
        self.assertEqual(data, odata[:500])
149 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
150 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
151 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
152 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
153 3a19e99b Sofia Papagiannaki
154 3a19e99b Sofia Papagiannaki
    def test_get_final_500(self):
155 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
156 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
157 3a19e99b Sofia Papagiannaki
        size = len(odata)
158 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
159 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=-500')
160 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
161 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata[-500:])
162 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
163 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
164 3a19e99b Sofia Papagiannaki
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
165 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
166 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
167 3a19e99b Sofia Papagiannaki
168 3a19e99b Sofia Papagiannaki
    def test_get_rest(self):
169 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
170 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
171 3a19e99b Sofia Papagiannaki
        size = len(odata)
172 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
173 3a19e99b Sofia Papagiannaki
        offset = len(odata) - random.randint(1, 512)
174 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
175 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
176 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata[offset:])
177 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
178 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
179 3a19e99b Sofia Papagiannaki
                         'bytes %s-%s/%s' % (offset, size - 1, size))
180 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
181 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
182 3a19e99b Sofia Papagiannaki
183 3a19e99b Sofia Papagiannaki
    def test_get_range_not_satisfiable(self):
184 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
185 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
186 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
187 3a19e99b Sofia Papagiannaki
188 3a19e99b Sofia Papagiannaki
        # TODO
189 3a19e99b Sofia Papagiannaki
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
190 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r.status_code, 416)
191 3a19e99b Sofia Papagiannaki
192 3a19e99b Sofia Papagiannaki
        offset = len(odata) + 1
193 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
194 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
195 3a19e99b Sofia Papagiannaki
196 3a19e99b Sofia Papagiannaki
    def test_multiple_range(self):
197 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
198 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
199 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
200 3a19e99b Sofia Papagiannaki
201 3a19e99b Sofia Papagiannaki
        l = ['0-499', '-500', '1000-']
202 3a19e99b Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
203 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE=ranges)
204 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
205 3a19e99b Sofia Papagiannaki
        self.assertTrue('content-type' in r)
206 3a19e99b Sofia Papagiannaki
        p = re.compile(
207 3a19e99b Sofia Papagiannaki
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
208 3a19e99b Sofia Papagiannaki
            re.I)
209 3a19e99b Sofia Papagiannaki
        m = p.match(r['content-type'])
210 3a19e99b Sofia Papagiannaki
        if m is None:
211 3a19e99b Sofia Papagiannaki
            self.fail('Invalid multiple range content type')
212 3a19e99b Sofia Papagiannaki
        boundary = m.groupdict()['boundary']
213 3a19e99b Sofia Papagiannaki
        cparts = r.content.split('--%s' % boundary)[1:-1]
214 3a19e99b Sofia Papagiannaki
215 3a19e99b Sofia Papagiannaki
        # assert content parts length
216 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(cparts), len(l))
217 3a19e99b Sofia Papagiannaki
218 3a19e99b Sofia Papagiannaki
        # for each content part assert headers
219 3a19e99b Sofia Papagiannaki
        i = 0
220 3a19e99b Sofia Papagiannaki
        for cpart in cparts:
221 3a19e99b Sofia Papagiannaki
            content = cpart.split('\r\n')
222 3a19e99b Sofia Papagiannaki
            headers = content[1:3]
223 3a19e99b Sofia Papagiannaki
            content_range = headers[0].split(': ')
224 3a19e99b Sofia Papagiannaki
            self.assertEqual(content_range[0], 'Content-Range')
225 3a19e99b Sofia Papagiannaki
226 3a19e99b Sofia Papagiannaki
            r = l[i].split('-')
227 3a19e99b Sofia Papagiannaki
            if not r[0] and not r[1]:
228 3a19e99b Sofia Papagiannaki
                pass
229 3a19e99b Sofia Papagiannaki
            elif not r[0]:
230 3a19e99b Sofia Papagiannaki
                start = len(odata) - int(r[1])
231 3a19e99b Sofia Papagiannaki
                end = len(odata)
232 3a19e99b Sofia Papagiannaki
            elif not r[1]:
233 3a19e99b Sofia Papagiannaki
                start = int(r[0])
234 3a19e99b Sofia Papagiannaki
                end = len(odata)
235 3a19e99b Sofia Papagiannaki
            else:
236 3a19e99b Sofia Papagiannaki
                start = int(r[0])
237 3a19e99b Sofia Papagiannaki
                end = int(r[1]) + 1
238 3a19e99b Sofia Papagiannaki
            fdata = odata[start:end]
239 3a19e99b Sofia Papagiannaki
            sdata = '\r\n'.join(content[4:-1])
240 3a19e99b Sofia Papagiannaki
            self.assertEqual(len(fdata), len(sdata))
241 3a19e99b Sofia Papagiannaki
            self.assertEquals(fdata, sdata)
242 3a19e99b Sofia Papagiannaki
            i += 1
243 3a19e99b Sofia Papagiannaki
244 3a19e99b Sofia Papagiannaki
    def test_multiple_range_not_satisfiable(self):
245 3a19e99b Sofia Papagiannaki
        # perform get with multiple range
246 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
247 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
248 3a19e99b Sofia Papagiannaki
        out_of_range = len(odata) + 1
249 3a19e99b Sofia Papagiannaki
        l = ['0-499', '-500', '%d-' % out_of_range]
250 3a19e99b Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
251 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
252 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE=ranges)
253 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
254 3a19e99b Sofia Papagiannaki
255 5fe43b8c Sofia Papagiannaki
    def test_get_if_match(self):
256 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
257 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
258 3a19e99b Sofia Papagiannaki
259 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
260 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
261 3a19e99b Sofia Papagiannaki
262 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
263 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
264 3a19e99b Sofia Papagiannaki
        else:
265 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
266 3a19e99b Sofia Papagiannaki
267 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=etag)
268 3a19e99b Sofia Papagiannaki
269 3a19e99b Sofia Papagiannaki
        # assert get success
270 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
271 3a19e99b Sofia Papagiannaki
272 3a19e99b Sofia Papagiannaki
        # assert response content
273 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
274 3a19e99b Sofia Papagiannaki
275 5fe43b8c Sofia Papagiannaki
    def test_get_if_match_star(self):
276 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
277 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
278 3a19e99b Sofia Papagiannaki
279 3a19e99b Sofia Papagiannaki
        # perform get with If-Match *
280 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
281 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH='*')
282 3a19e99b Sofia Papagiannaki
283 3a19e99b Sofia Papagiannaki
        # assert get success
284 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
285 3a19e99b Sofia Papagiannaki
286 3a19e99b Sofia Papagiannaki
        # assert response content
287 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
288 3a19e99b Sofia Papagiannaki
289 5fe43b8c Sofia Papagiannaki
    def test_get_multiple_if_match(self):
290 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
291 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
292 3a19e99b Sofia Papagiannaki
293 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
294 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
295 3a19e99b Sofia Papagiannaki
296 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
297 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
298 3a19e99b Sofia Papagiannaki
        else:
299 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
300 3a19e99b Sofia Papagiannaki
301 5fe43b8c Sofia Papagiannaki
        quoted = lambda s: '"%s"' % s
302 5fe43b8c Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=','.join(
303 5fe43b8c Sofia Papagiannaki
            [quoted(etag), quoted(get_random_data(64))]))
304 3a19e99b Sofia Papagiannaki
305 3a19e99b Sofia Papagiannaki
        # assert get success
306 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
307 3a19e99b Sofia Papagiannaki
308 3a19e99b Sofia Papagiannaki
        # assert response content
309 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
310 3a19e99b Sofia Papagiannaki
311 3a19e99b Sofia Papagiannaki
    def test_if_match_precondition_failed(self):
312 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
313 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
314 3a19e99b Sofia Papagiannaki
315 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
316 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
317 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=get_random_data(8))
318 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 412)
319 3a19e99b Sofia Papagiannaki
320 a1f429b2 Sofia Papagiannaki
    def test_if_none_match(self):
321 3a19e99b Sofia Papagiannaki
        # upload object
322 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
323 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
324 3a19e99b Sofia Papagiannaki
325 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
326 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
327 3a19e99b Sofia Papagiannaki
        else:
328 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
329 3a19e99b Sofia Papagiannaki
330 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match
331 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
332 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
333 3a19e99b Sofia Papagiannaki
334 3a19e99b Sofia Papagiannaki
        # assert precondition_failed
335 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
336 3a19e99b Sofia Papagiannaki
337 3a19e99b Sofia Papagiannaki
        # update object data
338 3a19e99b Sofia Papagiannaki
        r = self.append_object_data(cname, oname)[-1]
339 3a19e99b Sofia Papagiannaki
        self.assertTrue(etag != r['ETag'])
340 3a19e99b Sofia Papagiannaki
341 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match
342 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
343 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
344 3a19e99b Sofia Papagiannaki
345 3a19e99b Sofia Papagiannaki
        # assert get success
346 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
347 3a19e99b Sofia Papagiannaki
348 a1f429b2 Sofia Papagiannaki
    def test_if_none_match_star(self):
349 3a19e99b Sofia Papagiannaki
        # upload object
350 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
351 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
352 3a19e99b Sofia Papagiannaki
353 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match with star
354 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
355 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
356 3a19e99b Sofia Papagiannaki
357 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
358 3a19e99b Sofia Papagiannaki
359 3a19e99b Sofia Papagiannaki
    def test_if_modified_since(self):
360 3a19e99b Sofia Papagiannaki
        # upload object
361 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
362 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
363 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
364 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
365 3a19e99b Sofia Papagiannaki
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
366 3a19e99b Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
367 3a19e99b Sofia Papagiannaki
368 3a19e99b Sofia Papagiannaki
        # Check not modified since
369 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
370 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
371 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
372 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
373 3a19e99b Sofia Papagiannaki
374 3a19e99b Sofia Papagiannaki
        _time.sleep(1)
375 3a19e99b Sofia Papagiannaki
376 3a19e99b Sofia Papagiannaki
        # update object data
377 3a19e99b Sofia Papagiannaki
        appended_data = self.append_object_data(cname, oname)[1]
378 3a19e99b Sofia Papagiannaki
379 3a19e99b Sofia Papagiannaki
        # Check modified since
380 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
381 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
382 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
383 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
384 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.content, odata + appended_data)
385 3a19e99b Sofia Papagiannaki
386 3a19e99b Sofia Papagiannaki
    def test_if_modified_since_invalid_date(self):
387 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
388 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
389 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
390 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
391 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
392 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
393 3a19e99b Sofia Papagiannaki
394 3a19e99b Sofia Papagiannaki
    def test_if_not_modified_since(self):
395 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
396 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
397 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
398 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
399 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
400 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
401 3a19e99b Sofia Papagiannaki
402 3a19e99b Sofia Papagiannaki
        # Check unmodified
403 3a19e99b Sofia Papagiannaki
        t1 = t + datetime.timedelta(seconds=1)
404 3a19e99b Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
405 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
406 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
407 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
408 3a19e99b Sofia Papagiannaki
            self.assertEqual(odata, odata)
409 3a19e99b Sofia Papagiannaki
410 3a19e99b Sofia Papagiannaki
        # modify object
411 3a19e99b Sofia Papagiannaki
        _time.sleep(2)
412 3a19e99b Sofia Papagiannaki
        self.append_object_data(cname, oname)
413 3a19e99b Sofia Papagiannaki
414 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
415 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
416 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
417 3a19e99b Sofia Papagiannaki
        t2 = t - datetime.timedelta(seconds=1)
418 3a19e99b Sofia Papagiannaki
        t2_formats = map(t2.strftime, DATE_FORMATS)
419 3a19e99b Sofia Papagiannaki
420 5fe43b8c Sofia Papagiannaki
        # check modified
421 3a19e99b Sofia Papagiannaki
        for t in t2_formats:
422 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
423 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
424 3a19e99b Sofia Papagiannaki
425 800af189 Sofia Papagiannaki
        # modify account: update object meta
426 3a19e99b Sofia Papagiannaki
        _time.sleep(1)
427 3a19e99b Sofia Papagiannaki
        self.update_object_meta(cname, oname, {'foo': 'bar'})
428 3a19e99b Sofia Papagiannaki
429 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
430 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
431 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
432 3a19e99b Sofia Papagiannaki
        t3 = t - datetime.timedelta(seconds=1)
433 3a19e99b Sofia Papagiannaki
        t3_formats = map(t3.strftime, DATE_FORMATS)
434 3a19e99b Sofia Papagiannaki
435 5fe43b8c Sofia Papagiannaki
        # check modified
436 3a19e99b Sofia Papagiannaki
        for t in t3_formats:
437 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
438 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
439 3a19e99b Sofia Papagiannaki
440 3a19e99b Sofia Papagiannaki
    def test_if_unmodified_since(self):
441 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
442 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
443 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
444 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
445 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
446 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
447 3a19e99b Sofia Papagiannaki
        t = t + datetime.timedelta(seconds=1)
448 3a19e99b Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
449 3a19e99b Sofia Papagiannaki
450 3a19e99b Sofia Papagiannaki
        for tf in t_formats:
451 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
452 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
453 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.content, odata)
454 3a19e99b Sofia Papagiannaki
455 3a19e99b Sofia Papagiannaki
    def test_if_unmodified_since_precondition_failed(self):
456 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
457 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
458 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
459 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
460 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
461 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
462 3a19e99b Sofia Papagiannaki
        t = t - datetime.timedelta(seconds=1)
463 3a19e99b Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
464 3a19e99b Sofia Papagiannaki
465 3a19e99b Sofia Papagiannaki
        for tf in t_formats:
466 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
467 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
468 3a19e99b Sofia Papagiannaki
469 3a19e99b Sofia Papagiannaki
    def test_hashes(self):
470 3a19e99b Sofia Papagiannaki
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
471 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
472 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=l)[:-1]
473 3a19e99b Sofia Papagiannaki
        size = len(odata)
474 3a19e99b Sofia Papagiannaki
475 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
476 3a19e99b Sofia Papagiannaki
        r = self.get('%s?format=json&hashmap' % url)
477 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
478 3a19e99b Sofia Papagiannaki
        body = json.loads(r.content)
479 3a19e99b Sofia Papagiannaki
480 3a19e99b Sofia Papagiannaki
        hashes = body['hashes']
481 3a19e99b Sofia Papagiannaki
        block_size = body['block_size']
482 3a19e99b Sofia Papagiannaki
        block_num = size / block_size if size / block_size == 0 else\
483 3a19e99b Sofia Papagiannaki
            size / block_size + 1
484 3a19e99b Sofia Papagiannaki
        self.assertTrue(len(hashes), block_num)
485 3a19e99b Sofia Papagiannaki
        i = 0
486 3a19e99b Sofia Papagiannaki
        for h in hashes:
487 3a19e99b Sofia Papagiannaki
            start = i * block_size
488 3a19e99b Sofia Papagiannaki
            end = (i + 1) * block_size
489 5fe43b8c Sofia Papagiannaki
            hash = merkle(odata[start:end])
490 3a19e99b Sofia Papagiannaki
            self.assertEqual(h, hash)
491 3a19e99b Sofia Papagiannaki
            i += 1
492 3a19e99b Sofia Papagiannaki
493 3a19e99b Sofia Papagiannaki
494 3a19e99b Sofia Papagiannaki
class ObjectPut(PithosAPITest):
495 3a19e99b Sofia Papagiannaki
    def setUp(self):
496 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
497 3a19e99b Sofia Papagiannaki
        self.container = get_random_data(8)
498 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
499 3a19e99b Sofia Papagiannaki
500 3a19e99b Sofia Papagiannaki
    def test_upload(self):
501 3a19e99b Sofia Papagiannaki
        cname = self.container
502 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
503 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
504 3a19e99b Sofia Papagiannaki
        meta = {'test': 'test1'}
505 3a19e99b Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
506 3a19e99b Sofia Papagiannaki
                       for k, v in meta.iteritems())
507 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
508 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data, content_type='application/pdf', **headers)
509 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
510 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
511 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
512 3a19e99b Sofia Papagiannaki
513 3a19e99b Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
514 3a19e99b Sofia Papagiannaki
515 3a19e99b Sofia Papagiannaki
        # assert object meta
516 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in info)
517 3a19e99b Sofia Papagiannaki
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
518 3a19e99b Sofia Papagiannaki
519 3a19e99b Sofia Papagiannaki
        # assert content-type
520 3a19e99b Sofia Papagiannaki
        self.assertEqual(info['content-type'], 'application/pdf')
521 3a19e99b Sofia Papagiannaki
522 3a19e99b Sofia Papagiannaki
        # assert uploaded content
523 3a19e99b Sofia Papagiannaki
        r = self.get(url)
524 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
525 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
526 3a19e99b Sofia Papagiannaki
527 3a19e99b Sofia Papagiannaki
    def test_maximum_upload_size_exceeds(self):
528 3a19e99b Sofia Papagiannaki
        cname = self.container
529 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
530 3a19e99b Sofia Papagiannaki
531 3a19e99b Sofia Papagiannaki
        # set container quota to 100
532 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname)
533 3a19e99b Sofia Papagiannaki
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
534 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
535 3a19e99b Sofia Papagiannaki
536 3a19e99b Sofia Papagiannaki
        info = self.get_container_info(cname)
537 3a19e99b Sofia Papagiannaki
        length = int(info['X-Container-Policy-Quota']) + 1
538 3a19e99b Sofia Papagiannaki
539 3a19e99b Sofia Papagiannaki
        data = get_random_data(length)
540 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
541 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data)
542 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 413)
543 3a19e99b Sofia Papagiannaki
544 3a19e99b Sofia Papagiannaki
    def test_upload_with_name_containing_slash(self):
545 3a19e99b Sofia Papagiannaki
        cname = self.container
546 3a19e99b Sofia Papagiannaki
        oname = '/%s' % get_random_data(8)
547 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
548 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
549 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data)
550 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
551 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
552 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
553 3a19e99b Sofia Papagiannaki
554 3a19e99b Sofia Papagiannaki
        r = self.get(url)
555 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
556 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
557 3a19e99b Sofia Papagiannaki
558 3a19e99b Sofia Papagiannaki
    def test_upload_unprocessable_entity(self):
559 3a19e99b Sofia Papagiannaki
        cname = self.container
560 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
561 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
562 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
563 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data, HTTP_ETAG='123')
564 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 422)
565 3a19e99b Sofia Papagiannaki
566 3a19e99b Sofia Papagiannaki
#    def test_chunked_transfer(self):
567 3a19e99b Sofia Papagiannaki
#        cname = self.container
568 3a19e99b Sofia Papagiannaki
#        oname = '/%s' % get_random_data(8)
569 5fe43b8c Sofia Papagiannaki
#        data = get_random_data()
570 3a19e99b Sofia Papagiannaki
#        url = join_urls(self.pithos_path, self.user, cname, oname)
571 3a19e99b Sofia Papagiannaki
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
572 3a19e99b Sofia Papagiannaki
#        self.assertEqual(r.status_code, 201)
573 3a19e99b Sofia Papagiannaki
#        self.assertTrue('ETag' in r)
574 3a19e99b Sofia Papagiannaki
#        self.assertTrue('X-Object-Version' in r)
575 3a19e99b Sofia Papagiannaki
576 3a19e99b Sofia Papagiannaki
    def test_manifestation(self):
577 3a19e99b Sofia Papagiannaki
        cname = self.container
578 3a19e99b Sofia Papagiannaki
        prefix = 'myobject/'
579 3a19e99b Sofia Papagiannaki
        data = ''
580 3a19e99b Sofia Papagiannaki
        for i in range(random.randint(2, 10)):
581 3a19e99b Sofia Papagiannaki
            part = '%s%d' % (prefix, i)
582 3a19e99b Sofia Papagiannaki
            data += self.upload_object(cname, oname=part)[1]
583 3a19e99b Sofia Papagiannaki
584 3a19e99b Sofia Papagiannaki
        manifest = '%s/%s' % (cname, prefix)
585 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
586 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
587 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
588 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
589 3a19e99b Sofia Papagiannaki
590 3a19e99b Sofia Papagiannaki
        # assert object exists
591 3a19e99b Sofia Papagiannaki
        r = self.get(url)
592 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
593 3a19e99b Sofia Papagiannaki
594 3a19e99b Sofia Papagiannaki
        # assert its content
595 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
596 3a19e99b Sofia Papagiannaki
597 3a19e99b Sofia Papagiannaki
        # invalid manifestation
598 3a19e99b Sofia Papagiannaki
        invalid_manifestation = '%s/%s' % (cname, get_random_data(8))
599 3a19e99b Sofia Papagiannaki
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
600 3a19e99b Sofia Papagiannaki
        r = self.get(url)
601 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, '')
602 3a19e99b Sofia Papagiannaki
603 3a19e99b Sofia Papagiannaki
    def test_create_zero_length_object(self):
604 3a19e99b Sofia Papagiannaki
        cname = self.container
605 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
606 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
607 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='')
608 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
609 3a19e99b Sofia Papagiannaki
610 3a19e99b Sofia Papagiannaki
        r = self.get(url)
611 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
612 3a19e99b Sofia Papagiannaki
        self.assertEqual(int(r['Content-Length']), 0)
613 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, '')
614 3a19e99b Sofia Papagiannaki
615 3a19e99b Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
616 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
617 3a19e99b Sofia Papagiannaki
        body = json.loads(r.content)
618 3a19e99b Sofia Papagiannaki
        hashes = body['hashes']
619 5fe43b8c Sofia Papagiannaki
        hash = merkle('')
620 3a19e99b Sofia Papagiannaki
        self.assertEqual(hashes, [hash])
621 3a19e99b Sofia Papagiannaki
622 3a19e99b Sofia Papagiannaki
    def test_create_object_by_hashmap(self):
623 3a19e99b Sofia Papagiannaki
        cname = self.container
624 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
625 3a19e99b Sofia Papagiannaki
626 3a19e99b Sofia Papagiannaki
        # upload an object
627 3a19e99b Sofia Papagiannaki
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
628 3a19e99b Sofia Papagiannaki
        # get it hashmap
629 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
630 3a19e99b Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
631 3a19e99b Sofia Papagiannaki
632 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
633 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
634 3a19e99b Sofia Papagiannaki
        r = self.put('%s?hashmap=' % url, data=r.content)
635 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
636 3a19e99b Sofia Papagiannaki
637 3a19e99b Sofia Papagiannaki
        r = self.get(url)
638 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
639 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
640 3a19e99b Sofia Papagiannaki
641 3a19e99b Sofia Papagiannaki
642 3a19e99b Sofia Papagiannaki
class ObjectCopy(PithosAPITest):
643 3a19e99b Sofia Papagiannaki
    def setUp(self):
644 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
645 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
646 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
647 3a19e99b Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
648 3a19e99b Sofia Papagiannaki
649 3a19e99b Sofia Papagiannaki
        url = join_urls(
650 3a19e99b Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
651 3a19e99b Sofia Papagiannaki
        r = self.head(url)
652 3a19e99b Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
653 3a19e99b Sofia Papagiannaki
654 3a19e99b Sofia Papagiannaki
    def test_copy(self):
655 3a19e99b Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
656 3a19e99b Sofia Papagiannaki
                                    self.object):
657 3a19e99b Sofia Papagiannaki
            # copy object
658 3a19e99b Sofia Papagiannaki
            oname = get_random_data(8)
659 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, oname)
660 3a19e99b Sofia Papagiannaki
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
661 3a19e99b Sofia Papagiannaki
                         HTTP_X_COPY_FROM='/%s/%s' % (
662 3a19e99b Sofia Papagiannaki
                             self.container, self.object))
663 3a19e99b Sofia Papagiannaki
664 3a19e99b Sofia Papagiannaki
            # assert copy success
665 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
666 3a19e99b Sofia Papagiannaki
667 3a19e99b Sofia Papagiannaki
            # assert access the new object
668 3a19e99b Sofia Papagiannaki
            r = self.head(url)
669 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
670 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
671 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
672 3a19e99b Sofia Papagiannaki
673 3a19e99b Sofia Papagiannaki
            # assert etag is the same
674 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
675 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
676 3a19e99b Sofia Papagiannaki
677 3a19e99b Sofia Papagiannaki
    def test_copy_from_different_container(self):
678 3a19e99b Sofia Papagiannaki
        cname = 'c2'
679 3a19e99b Sofia Papagiannaki
        self.create_container(cname)
680 3a19e99b Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
681 3a19e99b Sofia Papagiannaki
                                    self.object):
682 3a19e99b Sofia Papagiannaki
            oname = get_random_data(8)
683 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, cname, oname)
684 3a19e99b Sofia Papagiannaki
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
685 3a19e99b Sofia Papagiannaki
                         HTTP_X_COPY_FROM='/%s/%s' % (
686 3a19e99b Sofia Papagiannaki
                             self.container, self.object))
687 3a19e99b Sofia Papagiannaki
688 3a19e99b Sofia Papagiannaki
            # assert copy success
689 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
690 3a19e99b Sofia Papagiannaki
691 3a19e99b Sofia Papagiannaki
            # assert access the new object
692 3a19e99b Sofia Papagiannaki
            r = self.head(url)
693 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
694 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
695 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
696 3a19e99b Sofia Papagiannaki
697 3a19e99b Sofia Papagiannaki
            # assert etag is the same
698 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
699 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
700 3a19e99b Sofia Papagiannaki
701 3a19e99b Sofia Papagiannaki
    def test_copy_invalid(self):
702 3a19e99b Sofia Papagiannaki
        # copy from non-existent object
703 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
704 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
705 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
706 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
707 3a19e99b Sofia Papagiannaki
                         self.container, get_random_data(8)))
708 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
709 3a19e99b Sofia Papagiannaki
710 3a19e99b Sofia Papagiannaki
        # copy from non-existent container
711 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
712 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
713 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
714 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
715 3a19e99b Sofia Papagiannaki
                         get_random_data(8), self.object))
716 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
717 3a19e99b Sofia Papagiannaki
718 3a19e99b Sofia Papagiannaki
    def test_copy_dir(self):
719 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
720 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
721 3a19e99b Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
722 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
723 3a19e99b Sofia Papagiannaki
        append = objects.append
724 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
725 3a19e99b Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_data(8)),
726 3a19e99b Sofia Papagiannaki
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
727 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
728 3a19e99b Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_data(8)),
729 3a19e99b Sofia Papagiannaki
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
730 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
731 3a19e99b Sofia Papagiannaki
732 3a19e99b Sofia Papagiannaki
        # copy dir
733 3a19e99b Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
734 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
735 3a19e99b Sofia Papagiannaki
                        copy_folder)
736 3a19e99b Sofia Papagiannaki
        r = self.put('%s?delimiter=/' % url, data='',
737 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
738 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
739 3a19e99b Sofia Papagiannaki
740 3a19e99b Sofia Papagiannaki
        for obj in objects:
741 3a19e99b Sofia Papagiannaki
            # assert object exists
742 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
743 3a19e99b Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
744 3a19e99b Sofia Papagiannaki
            r = self.head(url)
745 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
746 3a19e99b Sofia Papagiannaki
747 3a19e99b Sofia Papagiannaki
            # assert metadata
748 3a19e99b Sofia Papagiannaki
            meta = self.get_object_meta(self.container, obj)
749 3a19e99b Sofia Papagiannaki
            for k in meta.keys():
750 3a19e99b Sofia Papagiannaki
                self.assertTrue(k in r)
751 3a19e99b Sofia Papagiannaki
                self.assertEqual(r[k], meta[k])
752 3a19e99b Sofia Papagiannaki
753 3a19e99b Sofia Papagiannaki
        # assert other has not been created under copy folder
754 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
755 3a19e99b Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
756 3a19e99b Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
757 3a19e99b Sofia Papagiannaki
        r = self.head(url)
758 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
759 3a19e99b Sofia Papagiannaki
760 3a19e99b Sofia Papagiannaki
761 3a19e99b Sofia Papagiannaki
class ObjectMove(PithosAPITest):
762 3a19e99b Sofia Papagiannaki
    def setUp(self):
763 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
764 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
765 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
766 3a19e99b Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
767 3a19e99b Sofia Papagiannaki
768 3a19e99b Sofia Papagiannaki
        url = join_urls(
769 3a19e99b Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
770 3a19e99b Sofia Papagiannaki
        r = self.head(url)
771 3a19e99b Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
772 3a19e99b Sofia Papagiannaki
773 3a19e99b Sofia Papagiannaki
    def test_move(self):
774 3a19e99b Sofia Papagiannaki
        # move object
775 3a19e99b Sofia Papagiannaki
        oname = get_random_data(8)
776 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
777 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
778 3a19e99b Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
779 3a19e99b Sofia Papagiannaki
                         self.container, self.object))
780 3a19e99b Sofia Papagiannaki
781 3a19e99b Sofia Papagiannaki
        # assert move success
782 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
783 3a19e99b Sofia Papagiannaki
784 3a19e99b Sofia Papagiannaki
        # assert access the new object
785 3a19e99b Sofia Papagiannaki
        r = self.head(url)
786 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
787 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
788 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
789 3a19e99b Sofia Papagiannaki
790 3a19e99b Sofia Papagiannaki
        # assert etag is the same
791 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
792 3a19e99b Sofia Papagiannaki
793 3a19e99b Sofia Papagiannaki
        # assert the initial object has been deleted
794 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
795 3a19e99b Sofia Papagiannaki
                        self.object)
796 3a19e99b Sofia Papagiannaki
        r = self.head(url)
797 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
798 3a19e99b Sofia Papagiannaki
799 3a19e99b Sofia Papagiannaki
    def test_move_dir(self):
800 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
801 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
802 3a19e99b Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
803 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
804 3a19e99b Sofia Papagiannaki
        append = objects.append
805 3a19e99b Sofia Papagiannaki
        meta = {}
806 3a19e99b Sofia Papagiannaki
        meta[objects[0]] = {}
807 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
808 3a19e99b Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_data(8)),
809 3a19e99b Sofia Papagiannaki
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
810 3a19e99b Sofia Papagiannaki
        meta[objects[1]] = {'X-Object-Meta-Depth': '1'}
811 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
812 3a19e99b Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_data(8)),
813 3a19e99b Sofia Papagiannaki
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
814 3a19e99b Sofia Papagiannaki
        meta[objects[1]] = {'X-Object-Meta-Depth': '2'}
815 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
816 3a19e99b Sofia Papagiannaki
817 3a19e99b Sofia Papagiannaki
        # move dir
818 3a19e99b Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
819 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
820 3a19e99b Sofia Papagiannaki
                        copy_folder)
821 3a19e99b Sofia Papagiannaki
        r = self.put('%s?delimiter=/' % url, data='',
822 3a19e99b Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
823 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
824 3a19e99b Sofia Papagiannaki
825 3a19e99b Sofia Papagiannaki
        for obj in objects:
826 3a19e99b Sofia Papagiannaki
            # assert initial object does not exist
827 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, obj)
828 3a19e99b Sofia Papagiannaki
            r = self.head(url)
829 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
830 3a19e99b Sofia Papagiannaki
831 3a19e99b Sofia Papagiannaki
            # assert new object was created
832 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
833 3a19e99b Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
834 3a19e99b Sofia Papagiannaki
            r = self.head(url)
835 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
836 3a19e99b Sofia Papagiannaki
837 3a19e99b Sofia Papagiannaki
#            # assert metadata
838 3a19e99b Sofia Papagiannaki
#            for k in meta[obj].keys():
839 3a19e99b Sofia Papagiannaki
#                self.assertTrue(k in r)
840 3a19e99b Sofia Papagiannaki
#                self.assertEqual(r[k], meta[obj][k])
841 3a19e99b Sofia Papagiannaki
842 3a19e99b Sofia Papagiannaki
        # assert other has not been created under copy folder
843 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
844 3a19e99b Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
845 3a19e99b Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
846 3a19e99b Sofia Papagiannaki
        r = self.head(url)
847 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
848 3a19e99b Sofia Papagiannaki
849 3a19e99b Sofia Papagiannaki
850 3a19e99b Sofia Papagiannaki
class ObjectPost(PithosAPITest):
851 3a19e99b Sofia Papagiannaki
    def setUp(self):
852 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
853 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
854 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
855 3a19e99b Sofia Papagiannaki
        self.object, self.object_data = self.upload_object(self.container)[:2]
856 3a19e99b Sofia Papagiannaki
857 3a19e99b Sofia Papagiannaki
    def test_update_meta(self):
858 3a19e99b Sofia Papagiannaki
        with AssertUUidInvariant(self.get_object_info,
859 3a19e99b Sofia Papagiannaki
                                 self.container,
860 3a19e99b Sofia Papagiannaki
                                 self.object):
861 3a19e99b Sofia Papagiannaki
            # update metadata
862 3a19e99b Sofia Papagiannaki
            d = {'a' * 114: 'b' * 256}
863 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
864 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
865 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
866 3a19e99b Sofia Papagiannaki
                            self.object)
867 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
868 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
869 3a19e99b Sofia Papagiannaki
870 3a19e99b Sofia Papagiannaki
            # assert metadata have been updated
871 3a19e99b Sofia Papagiannaki
            meta = self.get_object_meta(self.container, self.object)
872 3a19e99b Sofia Papagiannaki
873 3a19e99b Sofia Papagiannaki
            for k, v in d.items():
874 3a19e99b Sofia Papagiannaki
                key = 'X-Object-Meta-%s' % k.title()
875 3a19e99b Sofia Papagiannaki
                self.assertTrue(key in meta)
876 3a19e99b Sofia Papagiannaki
                self.assertTrue(meta[key], v)
877 3a19e99b Sofia Papagiannaki
878 3a19e99b Sofia Papagiannaki
            # Header key too large
879 3a19e99b Sofia Papagiannaki
            d = {'a' * 115: 'b' * 256}
880 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
881 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
882 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
883 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 400)
884 3a19e99b Sofia Papagiannaki
885 3a19e99b Sofia Papagiannaki
            # Header value too large
886 3a19e99b Sofia Papagiannaki
            d = {'a' * 114: 'b' * 257}
887 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
888 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
889 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
890 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 400)
891 3a19e99b Sofia Papagiannaki
892 3a19e99b Sofia Papagiannaki
#            # Check utf-8 meta
893 3a19e99b Sofia Papagiannaki
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
894 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
895 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
896 3a19e99b Sofia Papagiannaki
#            url = join_urls(self.pithos_path, self.user, self.container,
897 3a19e99b Sofia Papagiannaki
#                            self.object)
898 3a19e99b Sofia Papagiannaki
#            r = self.post(url, content_type='', **kwargs)
899 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 202)
900 3a19e99b Sofia Papagiannaki
#
901 3a19e99b Sofia Papagiannaki
#            # assert metadata have been updated
902 3a19e99b Sofia Papagiannaki
#            meta = self.get_object_meta(self.container, self.object)
903 3a19e99b Sofia Papagiannaki
#
904 3a19e99b Sofia Papagiannaki
#            for k, v in d.items():
905 3a19e99b Sofia Papagiannaki
#                key = 'X-Object-Meta-%s' % k.title()
906 3a19e99b Sofia Papagiannaki
#                self.assertTrue(key in meta)
907 3a19e99b Sofia Papagiannaki
#                self.assertTrue(meta[key], v)
908 3a19e99b Sofia Papagiannaki
#
909 3a19e99b Sofia Papagiannaki
#            # Header key too large
910 3a19e99b Sofia Papagiannaki
#            d = {'α' * 114: 'β' * (256 / 2)}
911 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
912 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
913 3a19e99b Sofia Papagiannaki
#            r = self.post(url, content_type='', **kwargs)
914 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 400)
915 3a19e99b Sofia Papagiannaki
#
916 3a19e99b Sofia Papagiannaki
#            # Header value too large
917 3a19e99b Sofia Papagiannaki
#            d = {'α' * 114: 'β' * 256}
918 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
919 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
920 3a19e99b Sofia Papagiannaki
#            r = self.udpate(url, content_type='', **kwargs)
921 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 400)
922 3a19e99b Sofia Papagiannaki
923 3a19e99b Sofia Papagiannaki
    def test_update_object(self):
924 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
925 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
926 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(
927 3a19e99b Sofia Papagiannaki
                block_size + 1, 2 * block_size))[:2]
928 3a19e99b Sofia Papagiannaki
929 3a19e99b Sofia Papagiannaki
        length = len(odata)
930 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
931 3a19e99b Sofia Papagiannaki
        last_byte_pos = random.randint(block_size + 1, length - 1)
932 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
933 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
934 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
935 3a19e99b Sofia Papagiannaki
936 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
937 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
938 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
939 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
940 3a19e99b Sofia Papagiannaki
941 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
942 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
943 3a19e99b Sofia Papagiannaki
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
944 3a19e99b Sofia Papagiannaki
                                     data)
945 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
946 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(updated_data)
947 3a19e99b Sofia Papagiannaki
        else:
948 3a19e99b Sofia Papagiannaki
            etag = merkle(updated_data)
949 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r['ETag'], etag)
950 3a19e99b Sofia Papagiannaki
951 3a19e99b Sofia Papagiannaki
        # check modified object
952 3a19e99b Sofia Papagiannaki
        r = self.get(url)
953 3a19e99b Sofia Papagiannaki
954 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
955 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, updated_data)
956 3a19e99b Sofia Papagiannaki
        self.assertEqual(etag, r['ETag'])
957 3a19e99b Sofia Papagiannaki
958 3a19e99b Sofia Papagiannaki
    def test_update_object_divided_by_blocksize(self):
959 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
960 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(self.container,
961 3a19e99b Sofia Papagiannaki
                                          length=2 * block_size)[:2]
962 3a19e99b Sofia Papagiannaki
963 3a19e99b Sofia Papagiannaki
        length = len(odata)
964 3a19e99b Sofia Papagiannaki
        first_byte_pos = block_size
965 3a19e99b Sofia Papagiannaki
        last_byte_pos = 2 * block_size - 1
966 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
967 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
968 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
969 3a19e99b Sofia Papagiannaki
970 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
971 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
972 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
973 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
974 3a19e99b Sofia Papagiannaki
975 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
976 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
977 3a19e99b Sofia Papagiannaki
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
978 3a19e99b Sofia Papagiannaki
                                     data)
979 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
980 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(updated_data)
981 3a19e99b Sofia Papagiannaki
        else:
982 3a19e99b Sofia Papagiannaki
            etag = merkle(updated_data)
983 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r['ETag'], etag)
984 3a19e99b Sofia Papagiannaki
985 3a19e99b Sofia Papagiannaki
        # check modified object
986 3a19e99b Sofia Papagiannaki
        r = self.get(url)
987 3a19e99b Sofia Papagiannaki
988 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
989 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, updated_data)
990 3a19e99b Sofia Papagiannaki
        self.assertEqual(etag, r['ETag'])
991 3a19e99b Sofia Papagiannaki
992 3a19e99b Sofia Papagiannaki
    def test_update_object_invalid_content_length(self):
993 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
994 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
995 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(
996 3a19e99b Sofia Papagiannaki
                block_size + 1, 2 * block_size))[:2]
997 3a19e99b Sofia Papagiannaki
998 3a19e99b Sofia Papagiannaki
        length = len(odata)
999 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1000 3a19e99b Sofia Papagiannaki
        last_byte_pos = random.randint(block_size + 1, length - 1)
1001 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1002 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1003 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1004 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1005 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range,
1006 3a19e99b Sofia Papagiannaki
                  'CONTENT_LENGTH': partial + 1}
1007 3a19e99b Sofia Papagiannaki
1008 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1009 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1010 3a19e99b Sofia Papagiannaki
1011 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1012 3a19e99b Sofia Papagiannaki
1013 3a19e99b Sofia Papagiannaki
    def test_update_object_invalid_range(self):
1014 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1015 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1016 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(block_size + 1,
1017 3a19e99b Sofia Papagiannaki
                                                  2 * block_size))[:2]
1018 3a19e99b Sofia Papagiannaki
1019 3a19e99b Sofia Papagiannaki
        length = len(odata)
1020 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1021 3a19e99b Sofia Papagiannaki
        last_byte_pos = first_byte_pos - 1
1022 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1023 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1024 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1025 3a19e99b Sofia Papagiannaki
1026 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1027 5fe43b8c Sofia Papagiannaki
        r = self.post(url, data=get_random_data(), **kwargs)
1028 3a19e99b Sofia Papagiannaki
1029 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
1030 3a19e99b Sofia Papagiannaki
1031 3a19e99b Sofia Papagiannaki
    def test_update_object_out_of_limits(self):
1032 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1033 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1034 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(block_size + 1,
1035 3a19e99b Sofia Papagiannaki
                                                  2 * block_size))[:2]
1036 3a19e99b Sofia Papagiannaki
1037 3a19e99b Sofia Papagiannaki
        length = len(odata)
1038 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1039 3a19e99b Sofia Papagiannaki
        last_byte_pos = length + 1
1040 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1041 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1042 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1043 3a19e99b Sofia Papagiannaki
1044 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1045 5fe43b8c Sofia Papagiannaki
        r = self.post(url, data=get_random_data(), **kwargs)
1046 3a19e99b Sofia Papagiannaki
1047 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
1048 3a19e99b Sofia Papagiannaki
1049 3a19e99b Sofia Papagiannaki
    def test_append(self):
1050 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
1051 5fe43b8c Sofia Papagiannaki
        length = len(data)
1052 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1053 3a19e99b Sofia Papagiannaki
                        self.object)
1054 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, content_type='application/octet-stream',
1055 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_LENGTH=str(length),
1056 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*')
1057 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1058 3a19e99b Sofia Papagiannaki
1059 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1060 3a19e99b Sofia Papagiannaki
        content = r.content
1061 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(content), len(self.object_data) + length)
1062 3a19e99b Sofia Papagiannaki
        self.assertEqual(content, self.object_data + data)
1063 3a19e99b Sofia Papagiannaki
1064 5fe43b8c Sofia Papagiannaki
    # TODO Fix the test
1065 5fe43b8c Sofia Papagiannaki
    def _test_update_with_chunked_transfer(self):
1066 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
1067 5fe43b8c Sofia Papagiannaki
        length = len(data)
1068 3a19e99b Sofia Papagiannaki
1069 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1070 3a19e99b Sofia Papagiannaki
                        self.object)
1071 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, content_type='application/octet-stream',
1072 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1073 3a19e99b Sofia Papagiannaki
                      HTTP_TRANSFER_ENCODING='chunked')
1074 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1075 3a19e99b Sofia Papagiannaki
1076 3a19e99b Sofia Papagiannaki
        # check modified object
1077 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1078 3a19e99b Sofia Papagiannaki
        content = r.content
1079 3a19e99b Sofia Papagiannaki
        self.assertEqual(content[0:length], data)
1080 3a19e99b Sofia Papagiannaki
        self.assertEqual(content[length:], self.object_data[length:])
1081 3a19e99b Sofia Papagiannaki
1082 3a19e99b Sofia Papagiannaki
    def test_update_from_other_object(self):
1083 3a19e99b Sofia Papagiannaki
        src = self.object
1084 3a19e99b Sofia Papagiannaki
        dest = get_random_data(8)
1085 3a19e99b Sofia Papagiannaki
1086 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1087 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1088 3a19e99b Sofia Papagiannaki
        source_data = r.content
1089 3a19e99b Sofia Papagiannaki
        source_meta = self.get_object_info(self.container, src)
1090 3a19e99b Sofia Papagiannaki
1091 3a19e99b Sofia Papagiannaki
        # update zero length object
1092 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1093 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='')
1094 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1095 3a19e99b Sofia Papagiannaki
1096 3a19e99b Sofia Papagiannaki
        r = self.post(url,
1097 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1098 3a19e99b Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1099 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1100 3a19e99b Sofia Papagiannaki
1101 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1102 3a19e99b Sofia Papagiannaki
        dest_data = r.content
1103 3a19e99b Sofia Papagiannaki
        dest_meta = self.get_object_info(self.container, dest)
1104 3a19e99b Sofia Papagiannaki
1105 3a19e99b Sofia Papagiannaki
        self.assertEqual(source_data, dest_data)
1106 3a19e99b Sofia Papagiannaki
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1107 3a19e99b Sofia Papagiannaki
        self.assertEqual(source_meta['X-Object-Hash'],
1108 3a19e99b Sofia Papagiannaki
                         dest_meta['X-Object-Hash'])
1109 3a19e99b Sofia Papagiannaki
        self.assertTrue(
1110 3a19e99b Sofia Papagiannaki
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1111 3a19e99b Sofia Papagiannaki
1112 3a19e99b Sofia Papagiannaki
    def test_update_range_from_other_object(self):
1113 3a19e99b Sofia Papagiannaki
        src = self.object
1114 3a19e99b Sofia Papagiannaki
        dest = get_random_data(8)
1115 3a19e99b Sofia Papagiannaki
1116 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1117 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1118 3a19e99b Sofia Papagiannaki
        source_data = r.content
1119 3a19e99b Sofia Papagiannaki
1120 3a19e99b Sofia Papagiannaki
        # update zero length object
1121 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1122 5fe43b8c Sofia Papagiannaki
        initial_data = get_random_data()
1123 5fe43b8c Sofia Papagiannaki
        length = len(initial_data)
1124 a1f429b2 Sofia Papagiannaki
        r = self.put(url, data=initial_data)
1125 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1126 3a19e99b Sofia Papagiannaki
1127 3a19e99b Sofia Papagiannaki
        offset = random.randint(1, length - 2)
1128 3a19e99b Sofia Papagiannaki
        upto = random.randint(offset, length - 1)
1129 3a19e99b Sofia Papagiannaki
        r = self.post(url,
1130 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1131 3a19e99b Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1132 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1133 3a19e99b Sofia Papagiannaki
1134 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1135 3a19e99b Sofia Papagiannaki
        content = r.content
1136 a1f429b2 Sofia Papagiannaki
        self.assertEqual(content, (initial_data[:offset] +
1137 a1f429b2 Sofia Papagiannaki
                                   source_data[:upto - offset + 1] +
1138 a1f429b2 Sofia Papagiannaki
                                   initial_data[upto + 1:]))
1139 3a19e99b Sofia Papagiannaki
1140 3a19e99b Sofia Papagiannaki
1141 3a19e99b Sofia Papagiannaki
class ObjectDelete(PithosAPITest):
1142 3a19e99b Sofia Papagiannaki
    def setUp(self):
1143 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
1144 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
1145 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
1146 3a19e99b Sofia Papagiannaki
        self.object, self.object_data = self.upload_object(self.container)[:2]
1147 3a19e99b Sofia Papagiannaki
1148 3a19e99b Sofia Papagiannaki
    def test_delete(self):
1149 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1150 3a19e99b Sofia Papagiannaki
                        self.object)
1151 3a19e99b Sofia Papagiannaki
        r = self.delete(url)
1152 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1153 3a19e99b Sofia Papagiannaki
1154 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1155 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1156 3a19e99b Sofia Papagiannaki
1157 3a19e99b Sofia Papagiannaki
    def test_delete_non_existent(self):
1158 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1159 3a19e99b Sofia Papagiannaki
                        get_random_data(8))
1160 3a19e99b Sofia Papagiannaki
        r = self.delete(url)
1161 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1162 3a19e99b Sofia Papagiannaki
1163 3a19e99b Sofia Papagiannaki
    def test_delete_dir(self):
1164 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
1165 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
1166 3a19e99b Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_data(8)))[0]
1167 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
1168 3a19e99b Sofia Papagiannaki
        append = objects.append
1169 3a19e99b Sofia Papagiannaki
        meta = {}
1170 3a19e99b Sofia Papagiannaki
        meta[objects[0]] = {}
1171 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
1172 3a19e99b Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_data(8)),
1173 3a19e99b Sofia Papagiannaki
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
1174 3a19e99b Sofia Papagiannaki
        meta[objects[1]] = {'X-Object-Meta-Depth': '1'}
1175 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
1176 3a19e99b Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_data(8)),
1177 3a19e99b Sofia Papagiannaki
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
1178 3a19e99b Sofia Papagiannaki
        meta[objects[1]] = {'X-Object-Meta-Depth': '2'}
1179 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
1180 3a19e99b Sofia Papagiannaki
1181 3a19e99b Sofia Papagiannaki
        # move dir
1182 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1183 3a19e99b Sofia Papagiannaki
        r = self.delete('%s?delimiter=/' % url)
1184 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1185 3a19e99b Sofia Papagiannaki
1186 3a19e99b Sofia Papagiannaki
        for obj in objects:
1187 3a19e99b Sofia Papagiannaki
            # assert object does not exist
1188 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1189 3a19e99b Sofia Papagiannaki
            r = self.head(url)
1190 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
1191 3a19e99b Sofia Papagiannaki
1192 3a19e99b Sofia Papagiannaki
        # assert other has not been deleted
1193 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, other)
1194 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1195 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)