Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ ee7a28be

History | View | Annotate | Download (50 kB)

1 3a19e99b Sofia Papagiannaki
#!/usr/bin/env python
2 3a19e99b Sofia Papagiannaki
#coding=utf8
3 3a19e99b Sofia Papagiannaki
4 3a19e99b Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 3a19e99b Sofia Papagiannaki
#
6 3a19e99b Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 3a19e99b Sofia Papagiannaki
# without modification, are permitted provided that the following
8 3a19e99b Sofia Papagiannaki
# conditions are met:
9 3a19e99b Sofia Papagiannaki
#
10 3a19e99b Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 3a19e99b Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 3a19e99b Sofia Papagiannaki
#      disclaimer.
13 3a19e99b Sofia Papagiannaki
#
14 3a19e99b Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 3a19e99b Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 3a19e99b Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 3a19e99b Sofia Papagiannaki
#      provided with the distribution.
18 3a19e99b Sofia Papagiannaki
#
19 3a19e99b Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 3a19e99b Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 3a19e99b Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 3a19e99b Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 3a19e99b Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 3a19e99b Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 3a19e99b Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 3a19e99b Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 3a19e99b Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 3a19e99b Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 3a19e99b Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 3a19e99b Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 3a19e99b Sofia Papagiannaki
#
32 3a19e99b Sofia Papagiannaki
# The views and conclusions contained in the software and
33 3a19e99b Sofia Papagiannaki
# documentation are those of the authors and should not be
34 3a19e99b Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 3a19e99b Sofia Papagiannaki
# or implied, of GRNET S.A.
36 3a19e99b Sofia Papagiannaki
37 3a19e99b Sofia Papagiannaki
from collections import defaultdict
38 3a19e99b Sofia Papagiannaki
from urllib import quote
39 5fe43b8c Sofia Papagiannaki
from functools import partial
40 3a19e99b Sofia Papagiannaki
41 3a19e99b Sofia Papagiannaki
from pithos.api.test import (PithosAPITest, pithos_settings,
42 3a19e99b Sofia Papagiannaki
                             AssertMappingInvariant, AssertUUidInvariant,
43 5fe43b8c Sofia Papagiannaki
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44 3a19e99b Sofia Papagiannaki
                             DATE_FORMATS)
45 bc4f25c0 Sofia Papagiannaki
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46 bc4f25c0 Sofia Papagiannaki
                                  get_random_data, get_random_name)
47 3a19e99b Sofia Papagiannaki
48 3a19e99b Sofia Papagiannaki
from synnefo.lib import join_urls
49 3a19e99b Sofia Papagiannaki
50 3a19e99b Sofia Papagiannaki
import django.utils.simplejson as json
51 3a19e99b Sofia Papagiannaki
52 3a19e99b Sofia Papagiannaki
import random
53 3a19e99b Sofia Papagiannaki
import re
54 3a19e99b Sofia Papagiannaki
import datetime
55 5fe43b8c Sofia Papagiannaki
import time as _time
56 5fe43b8c Sofia Papagiannaki
57 5fe43b8c Sofia Papagiannaki
merkle = partial(merkle,
58 5fe43b8c Sofia Papagiannaki
                 blocksize=TEST_BLOCK_SIZE,
59 5fe43b8c Sofia Papagiannaki
                 blockhash=TEST_HASH_ALGORITHM)
60 3a19e99b Sofia Papagiannaki
61 3a19e99b Sofia Papagiannaki
62 2a194295 Sofia Papagiannaki
class ObjectHead(PithosAPITest):
63 2a194295 Sofia Papagiannaki
    def setUp(self):
64 2a194295 Sofia Papagiannaki
        cname = self.create_container()[0]
65 2a194295 Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
66 2a194295 Sofia Papagiannaki
67 2a194295 Sofia Papagiannaki
        url = join_urls(self.pithos_path, cname, oname)
68 2a194295 Sofia Papagiannaki
        r = self.head(url)
69 2a194295 Sofia Papagiannaki
        map(lambda i: self.assertTrue(i in r),
70 2a194295 Sofia Papagiannaki
            ['Etag',
71 2a194295 Sofia Papagiannaki
             'Content-Length',
72 2a194295 Sofia Papagiannaki
             'Content-Type',
73 2a194295 Sofia Papagiannaki
             'Last-Modified',
74 2a194295 Sofia Papagiannaki
             'Content-Encoding',
75 2a194295 Sofia Papagiannaki
             'Content-Disposition',
76 2a194295 Sofia Papagiannaki
             'X-Object-Hash',
77 2a194295 Sofia Papagiannaki
             'X-Object-UUID',
78 2a194295 Sofia Papagiannaki
             'X-Object-Version',
79 2a194295 Sofia Papagiannaki
             'X-Object-Version-Timestamp',
80 2a194295 Sofia Papagiannaki
             'X-Object-Modified-By',
81 2a194295 Sofia Papagiannaki
             'X-Object-Manifest'])
82 2a194295 Sofia Papagiannaki
83 2a194295 Sofia Papagiannaki
84 3a19e99b Sofia Papagiannaki
class ObjectGet(PithosAPITest):
85 3a19e99b Sofia Papagiannaki
    def setUp(self):
86 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
87 3a19e99b Sofia Papagiannaki
        self.containers = ['c1', 'c2']
88 3a19e99b Sofia Papagiannaki
89 3a19e99b Sofia Papagiannaki
        # create some containers
90 3a19e99b Sofia Papagiannaki
        for c in self.containers:
91 3a19e99b Sofia Papagiannaki
            self.create_container(c)
92 3a19e99b Sofia Papagiannaki
93 3a19e99b Sofia Papagiannaki
        # upload files
94 3a19e99b Sofia Papagiannaki
        self.objects = defaultdict(list)
95 3a19e99b Sofia Papagiannaki
        self.objects['c1'].append(self.upload_object('c1')[0])
96 3a19e99b Sofia Papagiannaki
97 3a19e99b Sofia Papagiannaki
    def test_versions(self):
98 3a19e99b Sofia Papagiannaki
        c = 'c1'
99 3a19e99b Sofia Papagiannaki
        o = self.objects[c][0]
100 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
101 3a19e99b Sofia Papagiannaki
102 3a19e99b Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
103 3a19e99b Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
104 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
105 3a19e99b Sofia Papagiannaki
106 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
107 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
108 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
109 3a19e99b Sofia Papagiannaki
        l1 = json.loads(r.content)['versions']
110 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l1), 2)
111 3a19e99b Sofia Papagiannaki
112 3a19e99b Sofia Papagiannaki
        # update meta
113 3a19e99b Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
114 3a19e99b Sofia Papagiannaki
                'HTTP_X_OBJECT_META_STOCK': 'True'}
115 3a19e99b Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
116 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
117 3a19e99b Sofia Papagiannaki
118 3a19e99b Sofia Papagiannaki
        # assert a newly created version has been created
119 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
120 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
121 3a19e99b Sofia Papagiannaki
        l2 = json.loads(r.content)['versions']
122 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l2), len(l1) + 1)
123 3a19e99b Sofia Papagiannaki
        self.assertEqual(l2[:-1], l1)
124 3a19e99b Sofia Papagiannaki
125 3a19e99b Sofia Papagiannaki
        vserial, _ = l2[-2]
126 3a19e99b Sofia Papagiannaki
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
127 6ee6677e Sofia Papagiannaki
                         {'Quality': 'AAA'})
128 3a19e99b Sofia Papagiannaki
129 3a19e99b Sofia Papagiannaki
        # update data
130 3a19e99b Sofia Papagiannaki
        self.append_object_data(c, o)
131 3a19e99b Sofia Papagiannaki
132 3a19e99b Sofia Papagiannaki
        # assert a newly created version has been created
133 3a19e99b Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
134 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
135 3a19e99b Sofia Papagiannaki
        l3 = json.loads(r.content)['versions']
136 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(l3), len(l2) + 1)
137 3a19e99b Sofia Papagiannaki
        self.assertEqual(l3[:-1], l2)
138 3a19e99b Sofia Papagiannaki
139 adce84cd Sofia Papagiannaki
    def test_get_version(self):
140 adce84cd Sofia Papagiannaki
        c = 'c1'
141 adce84cd Sofia Papagiannaki
        o = self.objects[c][0]
142 adce84cd Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
143 adce84cd Sofia Papagiannaki
144 adce84cd Sofia Papagiannaki
        # Update metadata
145 adce84cd Sofia Papagiannaki
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
146 adce84cd Sofia Papagiannaki
        r = self.post(url, content_type='', **meta)
147 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
148 adce84cd Sofia Papagiannaki
149 adce84cd Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, c, o)
150 adce84cd Sofia Papagiannaki
        r = self.get('%s?version=list&format=json' % url)
151 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
152 adce84cd Sofia Papagiannaki
        l = json.loads(r.content)['versions']
153 adce84cd Sofia Papagiannaki
        self.assertEqual(len(l), 2)
154 adce84cd Sofia Papagiannaki
155 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, l[0][0]))
156 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
157 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Quality' not in r)
158 adce84cd Sofia Papagiannaki
159 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, l[1][0]))
160 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
161 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Quality' in r)
162 adce84cd Sofia Papagiannaki
163 adce84cd Sofia Papagiannaki
        # test invalid version
164 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=-1' % url)
165 adce84cd Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
166 adce84cd Sofia Papagiannaki
167 adce84cd Sofia Papagiannaki
        other_name, other_data, r = self.upload_object(c)
168 adce84cd Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
169 adce84cd Sofia Papagiannaki
        other_version = r['X-Object-Version']
170 adce84cd Sofia Papagiannaki
171 adce84cd Sofia Papagiannaki
        self.assertTrue(o != other_name)
172 adce84cd Sofia Papagiannaki
173 adce84cd Sofia Papagiannaki
        r = self.get('%s?version=%s' % (url, other_version))
174 6ee6677e Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
175 adce84cd Sofia Papagiannaki
176 adce84cd Sofia Papagiannaki
        r = self.head('%s?version=%s' % (url, other_version))
177 6ee6677e Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
178 adce84cd Sofia Papagiannaki
179 3a19e99b Sofia Papagiannaki
    def test_objects_with_trailing_spaces(self):
180 3a19e99b Sofia Papagiannaki
        # create object
181 3a19e99b Sofia Papagiannaki
        oname = self.upload_object('c1')[0]
182 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
183 3a19e99b Sofia Papagiannaki
184 3a19e99b Sofia Papagiannaki
        r = self.get(quote('%s ' % url))
185 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
186 3a19e99b Sofia Papagiannaki
187 3a19e99b Sofia Papagiannaki
        # delete object
188 3a19e99b Sofia Papagiannaki
        self.delete(url)
189 3a19e99b Sofia Papagiannaki
190 3a19e99b Sofia Papagiannaki
        r = self.get(url)
191 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
192 3a19e99b Sofia Papagiannaki
193 3a19e99b Sofia Papagiannaki
        # upload object with trailing space
194 bc4f25c0 Sofia Papagiannaki
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
195 3a19e99b Sofia Papagiannaki
196 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
197 3a19e99b Sofia Papagiannaki
        r = self.get(url)
198 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
199 3a19e99b Sofia Papagiannaki
200 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
201 3a19e99b Sofia Papagiannaki
        r = self.get(url)
202 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
203 3a19e99b Sofia Papagiannaki
204 3a19e99b Sofia Papagiannaki
    def test_get_partial(self):
205 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
206 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
207 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
208 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=0-499')
209 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
210 3a19e99b Sofia Papagiannaki
        data = r.content
211 3a19e99b Sofia Papagiannaki
        self.assertEqual(data, odata[:500])
212 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
213 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
214 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
215 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
216 3a19e99b Sofia Papagiannaki
217 3a19e99b Sofia Papagiannaki
    def test_get_final_500(self):
218 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
219 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
220 3a19e99b Sofia Papagiannaki
        size = len(odata)
221 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
222 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=-500')
223 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
224 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata[-500:])
225 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
226 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
227 3a19e99b Sofia Papagiannaki
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
228 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
229 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
230 3a19e99b Sofia Papagiannaki
231 3a19e99b Sofia Papagiannaki
    def test_get_rest(self):
232 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
233 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
234 3a19e99b Sofia Papagiannaki
        size = len(odata)
235 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
236 3a19e99b Sofia Papagiannaki
        offset = len(odata) - random.randint(1, 512)
237 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
238 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
239 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata[offset:])
240 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Range' in r)
241 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['Content-Range'],
242 3a19e99b Sofia Papagiannaki
                         'bytes %s-%s/%s' % (offset, size - 1, size))
243 3a19e99b Sofia Papagiannaki
        self.assertTrue('Content-Type' in r)
244 3a19e99b Sofia Papagiannaki
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
245 3a19e99b Sofia Papagiannaki
246 3a19e99b Sofia Papagiannaki
    def test_get_range_not_satisfiable(self):
247 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
248 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=512)[:-1]
249 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
250 3a19e99b Sofia Papagiannaki
251 3a19e99b Sofia Papagiannaki
        # TODO
252 3a19e99b Sofia Papagiannaki
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
253 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r.status_code, 416)
254 3a19e99b Sofia Papagiannaki
255 3a19e99b Sofia Papagiannaki
        offset = len(odata) + 1
256 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
257 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
258 3a19e99b Sofia Papagiannaki
259 3a19e99b Sofia Papagiannaki
    def test_multiple_range(self):
260 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
261 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
262 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
263 3a19e99b Sofia Papagiannaki
264 3a19e99b Sofia Papagiannaki
        l = ['0-499', '-500', '1000-']
265 3a19e99b Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
266 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE=ranges)
267 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 206)
268 3a19e99b Sofia Papagiannaki
        self.assertTrue('content-type' in r)
269 3a19e99b Sofia Papagiannaki
        p = re.compile(
270 3a19e99b Sofia Papagiannaki
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
271 3a19e99b Sofia Papagiannaki
            re.I)
272 3a19e99b Sofia Papagiannaki
        m = p.match(r['content-type'])
273 3a19e99b Sofia Papagiannaki
        if m is None:
274 3a19e99b Sofia Papagiannaki
            self.fail('Invalid multiple range content type')
275 3a19e99b Sofia Papagiannaki
        boundary = m.groupdict()['boundary']
276 3a19e99b Sofia Papagiannaki
        cparts = r.content.split('--%s' % boundary)[1:-1]
277 3a19e99b Sofia Papagiannaki
278 3a19e99b Sofia Papagiannaki
        # assert content parts length
279 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(cparts), len(l))
280 3a19e99b Sofia Papagiannaki
281 3a19e99b Sofia Papagiannaki
        # for each content part assert headers
282 3a19e99b Sofia Papagiannaki
        i = 0
283 3a19e99b Sofia Papagiannaki
        for cpart in cparts:
284 3a19e99b Sofia Papagiannaki
            content = cpart.split('\r\n')
285 3a19e99b Sofia Papagiannaki
            headers = content[1:3]
286 3a19e99b Sofia Papagiannaki
            content_range = headers[0].split(': ')
287 3a19e99b Sofia Papagiannaki
            self.assertEqual(content_range[0], 'Content-Range')
288 3a19e99b Sofia Papagiannaki
289 3a19e99b Sofia Papagiannaki
            r = l[i].split('-')
290 3a19e99b Sofia Papagiannaki
            if not r[0] and not r[1]:
291 3a19e99b Sofia Papagiannaki
                pass
292 3a19e99b Sofia Papagiannaki
            elif not r[0]:
293 3a19e99b Sofia Papagiannaki
                start = len(odata) - int(r[1])
294 3a19e99b Sofia Papagiannaki
                end = len(odata)
295 3a19e99b Sofia Papagiannaki
            elif not r[1]:
296 3a19e99b Sofia Papagiannaki
                start = int(r[0])
297 3a19e99b Sofia Papagiannaki
                end = len(odata)
298 3a19e99b Sofia Papagiannaki
            else:
299 3a19e99b Sofia Papagiannaki
                start = int(r[0])
300 3a19e99b Sofia Papagiannaki
                end = int(r[1]) + 1
301 3a19e99b Sofia Papagiannaki
            fdata = odata[start:end]
302 3a19e99b Sofia Papagiannaki
            sdata = '\r\n'.join(content[4:-1])
303 3a19e99b Sofia Papagiannaki
            self.assertEqual(len(fdata), len(sdata))
304 3a19e99b Sofia Papagiannaki
            self.assertEquals(fdata, sdata)
305 3a19e99b Sofia Papagiannaki
            i += 1
306 3a19e99b Sofia Papagiannaki
307 3a19e99b Sofia Papagiannaki
    def test_multiple_range_not_satisfiable(self):
308 3a19e99b Sofia Papagiannaki
        # perform get with multiple range
309 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
310 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
311 3a19e99b Sofia Papagiannaki
        out_of_range = len(odata) + 1
312 3a19e99b Sofia Papagiannaki
        l = ['0-499', '-500', '%d-' % out_of_range]
313 3a19e99b Sofia Papagiannaki
        ranges = 'bytes=%s' % ','.join(l)
314 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
315 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_RANGE=ranges)
316 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
317 3a19e99b Sofia Papagiannaki
318 5fe43b8c Sofia Papagiannaki
    def test_get_if_match(self):
319 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
320 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
321 3a19e99b Sofia Papagiannaki
322 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
323 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
324 3a19e99b Sofia Papagiannaki
325 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
326 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
327 3a19e99b Sofia Papagiannaki
        else:
328 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
329 3a19e99b Sofia Papagiannaki
330 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=etag)
331 3a19e99b Sofia Papagiannaki
332 3a19e99b Sofia Papagiannaki
        # assert get success
333 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
334 3a19e99b Sofia Papagiannaki
335 3a19e99b Sofia Papagiannaki
        # assert response content
336 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
337 3a19e99b Sofia Papagiannaki
338 5fe43b8c Sofia Papagiannaki
    def test_get_if_match_star(self):
339 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
340 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
341 3a19e99b Sofia Papagiannaki
342 3a19e99b Sofia Papagiannaki
        # perform get with If-Match *
343 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
344 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH='*')
345 3a19e99b Sofia Papagiannaki
346 3a19e99b Sofia Papagiannaki
        # assert get success
347 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
348 3a19e99b Sofia Papagiannaki
349 3a19e99b Sofia Papagiannaki
        # assert response content
350 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
351 3a19e99b Sofia Papagiannaki
352 5fe43b8c Sofia Papagiannaki
    def test_get_multiple_if_match(self):
353 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
354 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
355 3a19e99b Sofia Papagiannaki
356 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
357 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
358 3a19e99b Sofia Papagiannaki
359 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
360 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
361 3a19e99b Sofia Papagiannaki
        else:
362 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
363 3a19e99b Sofia Papagiannaki
364 5fe43b8c Sofia Papagiannaki
        quoted = lambda s: '"%s"' % s
365 5fe43b8c Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=','.join(
366 5fe43b8c Sofia Papagiannaki
            [quoted(etag), quoted(get_random_data(64))]))
367 3a19e99b Sofia Papagiannaki
368 3a19e99b Sofia Papagiannaki
        # assert get success
369 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
370 3a19e99b Sofia Papagiannaki
371 3a19e99b Sofia Papagiannaki
        # assert response content
372 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
373 3a19e99b Sofia Papagiannaki
374 3a19e99b Sofia Papagiannaki
    def test_if_match_precondition_failed(self):
375 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
376 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
377 3a19e99b Sofia Papagiannaki
378 3a19e99b Sofia Papagiannaki
        # perform get with If-Match
379 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
380 bc4f25c0 Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
381 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 412)
382 3a19e99b Sofia Papagiannaki
383 a1f429b2 Sofia Papagiannaki
    def test_if_none_match(self):
384 3a19e99b Sofia Papagiannaki
        # upload object
385 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
386 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
387 3a19e99b Sofia Papagiannaki
388 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
389 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(odata)
390 3a19e99b Sofia Papagiannaki
        else:
391 3a19e99b Sofia Papagiannaki
            etag = merkle(odata)
392 3a19e99b Sofia Papagiannaki
393 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match
394 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
395 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
396 3a19e99b Sofia Papagiannaki
397 3a19e99b Sofia Papagiannaki
        # assert precondition_failed
398 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
399 3a19e99b Sofia Papagiannaki
400 3a19e99b Sofia Papagiannaki
        # update object data
401 3a19e99b Sofia Papagiannaki
        r = self.append_object_data(cname, oname)[-1]
402 3a19e99b Sofia Papagiannaki
        self.assertTrue(etag != r['ETag'])
403 3a19e99b Sofia Papagiannaki
404 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match
405 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
406 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
407 3a19e99b Sofia Papagiannaki
408 3a19e99b Sofia Papagiannaki
        # assert get success
409 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
410 3a19e99b Sofia Papagiannaki
411 a1f429b2 Sofia Papagiannaki
    def test_if_none_match_star(self):
412 3a19e99b Sofia Papagiannaki
        # upload object
413 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
414 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
415 3a19e99b Sofia Papagiannaki
416 3a19e99b Sofia Papagiannaki
        # perform get with If-None-Match with star
417 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
418 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
419 3a19e99b Sofia Papagiannaki
420 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 304)
421 3a19e99b Sofia Papagiannaki
422 3a19e99b Sofia Papagiannaki
    def test_if_modified_since(self):
423 3a19e99b Sofia Papagiannaki
        # upload object
424 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
425 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
426 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
427 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
428 3a19e99b Sofia Papagiannaki
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
429 3a19e99b Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
430 3a19e99b Sofia Papagiannaki
431 3a19e99b Sofia Papagiannaki
        # Check not modified since
432 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
433 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
434 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
435 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 304)
436 3a19e99b Sofia Papagiannaki
437 3a19e99b Sofia Papagiannaki
        _time.sleep(1)
438 3a19e99b Sofia Papagiannaki
439 3a19e99b Sofia Papagiannaki
        # update object data
440 3a19e99b Sofia Papagiannaki
        appended_data = self.append_object_data(cname, oname)[1]
441 3a19e99b Sofia Papagiannaki
442 3a19e99b Sofia Papagiannaki
        # Check modified since
443 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
444 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
445 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
446 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
447 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.content, odata + appended_data)
448 3a19e99b Sofia Papagiannaki
449 3a19e99b Sofia Papagiannaki
    def test_if_modified_since_invalid_date(self):
450 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
451 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
452 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
453 3a19e99b Sofia Papagiannaki
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
454 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
455 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, odata)
456 3a19e99b Sofia Papagiannaki
457 3a19e99b Sofia Papagiannaki
    def test_if_not_modified_since(self):
458 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
459 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
460 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
461 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
462 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
463 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
464 3a19e99b Sofia Papagiannaki
465 3a19e99b Sofia Papagiannaki
        # Check unmodified
466 3a19e99b Sofia Papagiannaki
        t1 = t + datetime.timedelta(seconds=1)
467 3a19e99b Sofia Papagiannaki
        t1_formats = map(t1.strftime, DATE_FORMATS)
468 3a19e99b Sofia Papagiannaki
        for t in t1_formats:
469 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
470 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
471 2a194295 Sofia Papagiannaki
            self.assertEqual(r.content, odata)
472 3a19e99b Sofia Papagiannaki
473 3a19e99b Sofia Papagiannaki
        # modify object
474 3a19e99b Sofia Papagiannaki
        _time.sleep(2)
475 3a19e99b Sofia Papagiannaki
        self.append_object_data(cname, oname)
476 3a19e99b Sofia Papagiannaki
477 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
478 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
479 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
480 3a19e99b Sofia Papagiannaki
        t2 = t - datetime.timedelta(seconds=1)
481 3a19e99b Sofia Papagiannaki
        t2_formats = map(t2.strftime, DATE_FORMATS)
482 3a19e99b Sofia Papagiannaki
483 5fe43b8c Sofia Papagiannaki
        # check modified
484 3a19e99b Sofia Papagiannaki
        for t in t2_formats:
485 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
486 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
487 3a19e99b Sofia Papagiannaki
488 800af189 Sofia Papagiannaki
        # modify account: update object meta
489 3a19e99b Sofia Papagiannaki
        _time.sleep(1)
490 3a19e99b Sofia Papagiannaki
        self.update_object_meta(cname, oname, {'foo': 'bar'})
491 3a19e99b Sofia Papagiannaki
492 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
493 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
494 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
495 3a19e99b Sofia Papagiannaki
        t3 = t - datetime.timedelta(seconds=1)
496 3a19e99b Sofia Papagiannaki
        t3_formats = map(t3.strftime, DATE_FORMATS)
497 3a19e99b Sofia Papagiannaki
498 5fe43b8c Sofia Papagiannaki
        # check modified
499 3a19e99b Sofia Papagiannaki
        for t in t3_formats:
500 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
501 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
502 3a19e99b Sofia Papagiannaki
503 3a19e99b Sofia Papagiannaki
    def test_if_unmodified_since(self):
504 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
505 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
506 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
507 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
508 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
509 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
510 3a19e99b Sofia Papagiannaki
        t = t + datetime.timedelta(seconds=1)
511 3a19e99b Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
512 3a19e99b Sofia Papagiannaki
513 3a19e99b Sofia Papagiannaki
        for tf in t_formats:
514 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
515 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
516 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.content, odata)
517 3a19e99b Sofia Papagiannaki
518 3a19e99b Sofia Papagiannaki
    def test_if_unmodified_since_precondition_failed(self):
519 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
520 5fe43b8c Sofia Papagiannaki
        oname, odata = self.upload_object(cname)[:-1]
521 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
522 3a19e99b Sofia Papagiannaki
        object_info = self.get_object_info(cname, oname)
523 3a19e99b Sofia Papagiannaki
        last_modified = object_info['Last-Modified']
524 3a19e99b Sofia Papagiannaki
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
525 3a19e99b Sofia Papagiannaki
        t = t - datetime.timedelta(seconds=1)
526 3a19e99b Sofia Papagiannaki
        t_formats = map(t.strftime, DATE_FORMATS)
527 3a19e99b Sofia Papagiannaki
528 3a19e99b Sofia Papagiannaki
        for tf in t_formats:
529 3a19e99b Sofia Papagiannaki
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
530 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 412)
531 3a19e99b Sofia Papagiannaki
532 3a19e99b Sofia Papagiannaki
    def test_hashes(self):
533 3a19e99b Sofia Papagiannaki
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
534 3a19e99b Sofia Papagiannaki
        cname = self.containers[0]
535 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(cname, length=l)[:-1]
536 3a19e99b Sofia Papagiannaki
        size = len(odata)
537 3a19e99b Sofia Papagiannaki
538 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
539 3a19e99b Sofia Papagiannaki
        r = self.get('%s?format=json&hashmap' % url)
540 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
541 3a19e99b Sofia Papagiannaki
        body = json.loads(r.content)
542 3a19e99b Sofia Papagiannaki
543 3a19e99b Sofia Papagiannaki
        hashes = body['hashes']
544 3a19e99b Sofia Papagiannaki
        block_size = body['block_size']
545 3a19e99b Sofia Papagiannaki
        block_num = size / block_size if size / block_size == 0 else\
546 3a19e99b Sofia Papagiannaki
            size / block_size + 1
547 3a19e99b Sofia Papagiannaki
        self.assertTrue(len(hashes), block_num)
548 3a19e99b Sofia Papagiannaki
        i = 0
549 3a19e99b Sofia Papagiannaki
        for h in hashes:
550 3a19e99b Sofia Papagiannaki
            start = i * block_size
551 3a19e99b Sofia Papagiannaki
            end = (i + 1) * block_size
552 5fe43b8c Sofia Papagiannaki
            hash = merkle(odata[start:end])
553 3a19e99b Sofia Papagiannaki
            self.assertEqual(h, hash)
554 3a19e99b Sofia Papagiannaki
            i += 1
555 3a19e99b Sofia Papagiannaki
556 3a19e99b Sofia Papagiannaki
557 3a19e99b Sofia Papagiannaki
class ObjectPut(PithosAPITest):
558 3a19e99b Sofia Papagiannaki
    def setUp(self):
559 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
560 bc4f25c0 Sofia Papagiannaki
        self.container = get_random_name()
561 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
562 3a19e99b Sofia Papagiannaki
563 3a19e99b Sofia Papagiannaki
    def test_upload(self):
564 3a19e99b Sofia Papagiannaki
        cname = self.container
565 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
566 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
567 3a19e99b Sofia Papagiannaki
        meta = {'test': 'test1'}
568 3a19e99b Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
569 3a19e99b Sofia Papagiannaki
                       for k, v in meta.iteritems())
570 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
571 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data, content_type='application/pdf', **headers)
572 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
573 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
574 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
575 3a19e99b Sofia Papagiannaki
576 3a19e99b Sofia Papagiannaki
        info = self.get_object_info(cname, oname)
577 3a19e99b Sofia Papagiannaki
578 3a19e99b Sofia Papagiannaki
        # assert object meta
579 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in info)
580 3a19e99b Sofia Papagiannaki
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
581 3a19e99b Sofia Papagiannaki
582 3a19e99b Sofia Papagiannaki
        # assert content-type
583 3a19e99b Sofia Papagiannaki
        self.assertEqual(info['content-type'], 'application/pdf')
584 3a19e99b Sofia Papagiannaki
585 3a19e99b Sofia Papagiannaki
        # assert uploaded content
586 3a19e99b Sofia Papagiannaki
        r = self.get(url)
587 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
588 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
589 3a19e99b Sofia Papagiannaki
590 3a19e99b Sofia Papagiannaki
    def test_maximum_upload_size_exceeds(self):
591 3a19e99b Sofia Papagiannaki
        cname = self.container
592 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
593 3a19e99b Sofia Papagiannaki
594 3a19e99b Sofia Papagiannaki
        # set container quota to 100
595 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname)
596 3a19e99b Sofia Papagiannaki
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
597 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
598 3a19e99b Sofia Papagiannaki
599 3a19e99b Sofia Papagiannaki
        info = self.get_container_info(cname)
600 3a19e99b Sofia Papagiannaki
        length = int(info['X-Container-Policy-Quota']) + 1
601 3a19e99b Sofia Papagiannaki
602 3a19e99b Sofia Papagiannaki
        data = get_random_data(length)
603 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
604 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data)
605 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 413)
606 3a19e99b Sofia Papagiannaki
607 3a19e99b Sofia Papagiannaki
    def test_upload_with_name_containing_slash(self):
608 3a19e99b Sofia Papagiannaki
        cname = self.container
609 bc4f25c0 Sofia Papagiannaki
        oname = '/%s' % get_random_name()
610 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
611 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
612 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data)
613 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
614 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
615 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
616 3a19e99b Sofia Papagiannaki
617 3a19e99b Sofia Papagiannaki
        r = self.get(url)
618 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
619 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
620 3a19e99b Sofia Papagiannaki
621 3a19e99b Sofia Papagiannaki
    def test_upload_unprocessable_entity(self):
622 3a19e99b Sofia Papagiannaki
        cname = self.container
623 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
624 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
625 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
626 3a19e99b Sofia Papagiannaki
        r = self.put(url, data=data, HTTP_ETAG='123')
627 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 422)
628 3a19e99b Sofia Papagiannaki
629 3a19e99b Sofia Papagiannaki
#    def test_chunked_transfer(self):
630 3a19e99b Sofia Papagiannaki
#        cname = self.container
631 bc4f25c0 Sofia Papagiannaki
#        oname = '/%s' % get_random_name()
632 5fe43b8c Sofia Papagiannaki
#        data = get_random_data()
633 3a19e99b Sofia Papagiannaki
#        url = join_urls(self.pithos_path, self.user, cname, oname)
634 3a19e99b Sofia Papagiannaki
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
635 3a19e99b Sofia Papagiannaki
#        self.assertEqual(r.status_code, 201)
636 3a19e99b Sofia Papagiannaki
#        self.assertTrue('ETag' in r)
637 3a19e99b Sofia Papagiannaki
#        self.assertTrue('X-Object-Version' in r)
638 3a19e99b Sofia Papagiannaki
639 3a19e99b Sofia Papagiannaki
    def test_manifestation(self):
640 3a19e99b Sofia Papagiannaki
        cname = self.container
641 3a19e99b Sofia Papagiannaki
        prefix = 'myobject/'
642 3a19e99b Sofia Papagiannaki
        data = ''
643 3a19e99b Sofia Papagiannaki
        for i in range(random.randint(2, 10)):
644 3a19e99b Sofia Papagiannaki
            part = '%s%d' % (prefix, i)
645 3a19e99b Sofia Papagiannaki
            data += self.upload_object(cname, oname=part)[1]
646 3a19e99b Sofia Papagiannaki
647 3a19e99b Sofia Papagiannaki
        manifest = '%s/%s' % (cname, prefix)
648 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
649 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
650 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
651 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
652 3a19e99b Sofia Papagiannaki
653 3a19e99b Sofia Papagiannaki
        # assert object exists
654 3a19e99b Sofia Papagiannaki
        r = self.get(url)
655 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
656 3a19e99b Sofia Papagiannaki
657 3a19e99b Sofia Papagiannaki
        # assert its content
658 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
659 3a19e99b Sofia Papagiannaki
660 3a19e99b Sofia Papagiannaki
        # invalid manifestation
661 bc4f25c0 Sofia Papagiannaki
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
662 3a19e99b Sofia Papagiannaki
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
663 3a19e99b Sofia Papagiannaki
        r = self.get(url)
664 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, '')
665 3a19e99b Sofia Papagiannaki
666 3a19e99b Sofia Papagiannaki
    def test_create_zero_length_object(self):
667 3a19e99b Sofia Papagiannaki
        cname = self.container
668 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
669 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
670 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='')
671 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
672 3a19e99b Sofia Papagiannaki
673 3a19e99b Sofia Papagiannaki
        r = self.get(url)
674 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
675 3a19e99b Sofia Papagiannaki
        self.assertEqual(int(r['Content-Length']), 0)
676 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, '')
677 3a19e99b Sofia Papagiannaki
678 3a19e99b Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
679 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
680 3a19e99b Sofia Papagiannaki
        body = json.loads(r.content)
681 3a19e99b Sofia Papagiannaki
        hashes = body['hashes']
682 5fe43b8c Sofia Papagiannaki
        hash = merkle('')
683 3a19e99b Sofia Papagiannaki
        self.assertEqual(hashes, [hash])
684 3a19e99b Sofia Papagiannaki
685 3a19e99b Sofia Papagiannaki
    def test_create_object_by_hashmap(self):
686 3a19e99b Sofia Papagiannaki
        cname = self.container
687 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
688 3a19e99b Sofia Papagiannaki
689 3a19e99b Sofia Papagiannaki
        # upload an object
690 3a19e99b Sofia Papagiannaki
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
691 3a19e99b Sofia Papagiannaki
        # get it hashmap
692 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
693 3a19e99b Sofia Papagiannaki
        r = self.get('%s?hashmap=&format=json' % url)
694 3a19e99b Sofia Papagiannaki
695 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
696 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, cname, oname)
697 3a19e99b Sofia Papagiannaki
        r = self.put('%s?hashmap=' % url, data=r.content)
698 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
699 3a19e99b Sofia Papagiannaki
700 3a19e99b Sofia Papagiannaki
        r = self.get(url)
701 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
702 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, data)
703 3a19e99b Sofia Papagiannaki
704 3a19e99b Sofia Papagiannaki
705 3a19e99b Sofia Papagiannaki
class ObjectCopy(PithosAPITest):
706 3a19e99b Sofia Papagiannaki
    def setUp(self):
707 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
708 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
709 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
710 3a19e99b Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
711 3a19e99b Sofia Papagiannaki
712 3a19e99b Sofia Papagiannaki
        url = join_urls(
713 3a19e99b Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
714 3a19e99b Sofia Papagiannaki
        r = self.head(url)
715 3a19e99b Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
716 3a19e99b Sofia Papagiannaki
717 3a19e99b Sofia Papagiannaki
    def test_copy(self):
718 3a19e99b Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
719 3a19e99b Sofia Papagiannaki
                                    self.object):
720 3a19e99b Sofia Papagiannaki
            # copy object
721 bc4f25c0 Sofia Papagiannaki
            oname = get_random_name()
722 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, oname)
723 3a19e99b Sofia Papagiannaki
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
724 3a19e99b Sofia Papagiannaki
                         HTTP_X_COPY_FROM='/%s/%s' % (
725 3a19e99b Sofia Papagiannaki
                             self.container, self.object))
726 3a19e99b Sofia Papagiannaki
727 3a19e99b Sofia Papagiannaki
            # assert copy success
728 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
729 3a19e99b Sofia Papagiannaki
730 3a19e99b Sofia Papagiannaki
            # assert access the new object
731 3a19e99b Sofia Papagiannaki
            r = self.head(url)
732 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
733 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
734 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
735 3a19e99b Sofia Papagiannaki
736 3a19e99b Sofia Papagiannaki
            # assert etag is the same
737 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
738 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
739 3a19e99b Sofia Papagiannaki
740 3a19e99b Sofia Papagiannaki
    def test_copy_from_different_container(self):
741 3a19e99b Sofia Papagiannaki
        cname = 'c2'
742 3a19e99b Sofia Papagiannaki
        self.create_container(cname)
743 3a19e99b Sofia Papagiannaki
        with AssertMappingInvariant(self.get_object_info, self.container,
744 3a19e99b Sofia Papagiannaki
                                    self.object):
745 bc4f25c0 Sofia Papagiannaki
            oname = get_random_name()
746 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, cname, oname)
747 3a19e99b Sofia Papagiannaki
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
748 3a19e99b Sofia Papagiannaki
                         HTTP_X_COPY_FROM='/%s/%s' % (
749 3a19e99b Sofia Papagiannaki
                             self.container, self.object))
750 3a19e99b Sofia Papagiannaki
751 3a19e99b Sofia Papagiannaki
            # assert copy success
752 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 201)
753 3a19e99b Sofia Papagiannaki
754 3a19e99b Sofia Papagiannaki
            # assert access the new object
755 3a19e99b Sofia Papagiannaki
            r = self.head(url)
756 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
757 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Meta-Test' in r)
758 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
759 3a19e99b Sofia Papagiannaki
760 3a19e99b Sofia Papagiannaki
            # assert etag is the same
761 3a19e99b Sofia Papagiannaki
            self.assertTrue('X-Object-Hash' in r)
762 3a19e99b Sofia Papagiannaki
            self.assertEqual(r['X-Object-Hash'], self.etag)
763 3a19e99b Sofia Papagiannaki
764 3a19e99b Sofia Papagiannaki
    def test_copy_invalid(self):
765 3a19e99b Sofia Papagiannaki
        # copy from non-existent object
766 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
767 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
768 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
769 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
770 bc4f25c0 Sofia Papagiannaki
                         self.container, get_random_name()))
771 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
772 3a19e99b Sofia Papagiannaki
773 3a19e99b Sofia Papagiannaki
        # copy from non-existent container
774 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
775 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
776 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
777 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (
778 bc4f25c0 Sofia Papagiannaki
                         get_random_name(), self.object))
779 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
780 3a19e99b Sofia Papagiannaki
781 3a19e99b Sofia Papagiannaki
    def test_copy_dir(self):
782 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
783 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
784 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
785 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
786 3a19e99b Sofia Papagiannaki
        append = objects.append
787 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
788 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_name()),
789 6ee6677e Sofia Papagiannaki
                                  depth='1')[0])
790 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
791 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_name()),
792 6ee6677e Sofia Papagiannaki
                                  depth='2')[0])
793 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
794 3a19e99b Sofia Papagiannaki
795 3a19e99b Sofia Papagiannaki
        # copy dir
796 3a19e99b Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
797 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
798 3a19e99b Sofia Papagiannaki
                        copy_folder)
799 3a19e99b Sofia Papagiannaki
        r = self.put('%s?delimiter=/' % url, data='',
800 3a19e99b Sofia Papagiannaki
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
801 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
802 3a19e99b Sofia Papagiannaki
803 3a19e99b Sofia Papagiannaki
        for obj in objects:
804 3a19e99b Sofia Papagiannaki
            # assert object exists
805 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
806 3a19e99b Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
807 3a19e99b Sofia Papagiannaki
            r = self.head(url)
808 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
809 3a19e99b Sofia Papagiannaki
810 3a19e99b Sofia Papagiannaki
            # assert metadata
811 3a19e99b Sofia Papagiannaki
            meta = self.get_object_meta(self.container, obj)
812 3a19e99b Sofia Papagiannaki
            for k in meta.keys():
813 6ee6677e Sofia Papagiannaki
                key = 'X-Object-Meta-%s' % k
814 6ee6677e Sofia Papagiannaki
                self.assertTrue(key in r)
815 6ee6677e Sofia Papagiannaki
                self.assertEqual(r[key], meta[k])
816 3a19e99b Sofia Papagiannaki
817 3a19e99b Sofia Papagiannaki
        # assert other has not been created under copy folder
818 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
819 3a19e99b Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
820 3a19e99b Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
821 3a19e99b Sofia Papagiannaki
        r = self.head(url)
822 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
823 3a19e99b Sofia Papagiannaki
824 3a19e99b Sofia Papagiannaki
825 3a19e99b Sofia Papagiannaki
class ObjectMove(PithosAPITest):
826 3a19e99b Sofia Papagiannaki
    def setUp(self):
827 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
828 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
829 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
830 3a19e99b Sofia Papagiannaki
        self.object, self.data = self.upload_object(self.container)[:-1]
831 3a19e99b Sofia Papagiannaki
832 3a19e99b Sofia Papagiannaki
        url = join_urls(
833 3a19e99b Sofia Papagiannaki
            self.pithos_path, self.user, self.container, self.object)
834 3a19e99b Sofia Papagiannaki
        r = self.head(url)
835 3a19e99b Sofia Papagiannaki
        self.etag = r['X-Object-Hash']
836 3a19e99b Sofia Papagiannaki
837 3a19e99b Sofia Papagiannaki
    def test_move(self):
838 3a19e99b Sofia Papagiannaki
        # move object
839 bc4f25c0 Sofia Papagiannaki
        oname = get_random_name()
840 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
841 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
842 3a19e99b Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (
843 3a19e99b Sofia Papagiannaki
                         self.container, self.object))
844 3a19e99b Sofia Papagiannaki
845 3a19e99b Sofia Papagiannaki
        # assert move success
846 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
847 3a19e99b Sofia Papagiannaki
848 3a19e99b Sofia Papagiannaki
        # assert access the new object
849 3a19e99b Sofia Papagiannaki
        r = self.head(url)
850 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
851 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Meta-Test' in r)
852 3a19e99b Sofia Papagiannaki
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
853 3a19e99b Sofia Papagiannaki
854 3a19e99b Sofia Papagiannaki
        # assert etag is the same
855 3a19e99b Sofia Papagiannaki
        self.assertTrue('X-Object-Hash' in r)
856 3a19e99b Sofia Papagiannaki
857 3a19e99b Sofia Papagiannaki
        # assert the initial object has been deleted
858 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
859 3a19e99b Sofia Papagiannaki
                        self.object)
860 3a19e99b Sofia Papagiannaki
        r = self.head(url)
861 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
862 3a19e99b Sofia Papagiannaki
863 3a19e99b Sofia Papagiannaki
    def test_move_dir(self):
864 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
865 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
866 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
867 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
868 3a19e99b Sofia Papagiannaki
        append = objects.append
869 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
870 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_name()),
871 6ee6677e Sofia Papagiannaki
                                  depth='1')[0])
872 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
873 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_name()),
874 6ee6677e Sofia Papagiannaki
                                  depth='1')[0])
875 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
876 3a19e99b Sofia Papagiannaki
877 3a19e99b Sofia Papagiannaki
        # move dir
878 3a19e99b Sofia Papagiannaki
        copy_folder = self.create_folder(self.container)[0]
879 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
880 3a19e99b Sofia Papagiannaki
                        copy_folder)
881 3a19e99b Sofia Papagiannaki
        r = self.put('%s?delimiter=/' % url, data='',
882 3a19e99b Sofia Papagiannaki
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
883 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
884 3a19e99b Sofia Papagiannaki
885 3a19e99b Sofia Papagiannaki
        for obj in objects:
886 3a19e99b Sofia Papagiannaki
            # assert initial object does not exist
887 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, obj)
888 3a19e99b Sofia Papagiannaki
            r = self.head(url)
889 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
890 3a19e99b Sofia Papagiannaki
891 3a19e99b Sofia Papagiannaki
            # assert new object was created
892 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
893 3a19e99b Sofia Papagiannaki
                            obj.replace(folder, copy_folder))
894 3a19e99b Sofia Papagiannaki
            r = self.head(url)
895 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 200)
896 3a19e99b Sofia Papagiannaki
897 3a19e99b Sofia Papagiannaki
        # assert other has not been created under copy folder
898 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
899 3a19e99b Sofia Papagiannaki
                        '%s/%s' % (copy_folder,
900 3a19e99b Sofia Papagiannaki
                                   other.replace(folder, copy_folder)))
901 3a19e99b Sofia Papagiannaki
        r = self.head(url)
902 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
903 3a19e99b Sofia Papagiannaki
904 3a19e99b Sofia Papagiannaki
905 3a19e99b Sofia Papagiannaki
class ObjectPost(PithosAPITest):
906 3a19e99b Sofia Papagiannaki
    def setUp(self):
907 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
908 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
909 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
910 3a19e99b Sofia Papagiannaki
        self.object, self.object_data = self.upload_object(self.container)[:2]
911 3a19e99b Sofia Papagiannaki
912 3a19e99b Sofia Papagiannaki
    def test_update_meta(self):
913 3a19e99b Sofia Papagiannaki
        with AssertUUidInvariant(self.get_object_info,
914 3a19e99b Sofia Papagiannaki
                                 self.container,
915 3a19e99b Sofia Papagiannaki
                                 self.object):
916 3a19e99b Sofia Papagiannaki
            # update metadata
917 3a19e99b Sofia Papagiannaki
            d = {'a' * 114: 'b' * 256}
918 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
919 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
920 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container,
921 3a19e99b Sofia Papagiannaki
                            self.object)
922 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
923 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 202)
924 3a19e99b Sofia Papagiannaki
925 3a19e99b Sofia Papagiannaki
            # assert metadata have been updated
926 3a19e99b Sofia Papagiannaki
            meta = self.get_object_meta(self.container, self.object)
927 3a19e99b Sofia Papagiannaki
928 3a19e99b Sofia Papagiannaki
            for k, v in d.items():
929 6ee6677e Sofia Papagiannaki
                self.assertTrue(k.title() in meta)
930 6ee6677e Sofia Papagiannaki
                self.assertTrue(meta[k.title()], v)
931 3a19e99b Sofia Papagiannaki
932 3a19e99b Sofia Papagiannaki
            # Header key too large
933 3a19e99b Sofia Papagiannaki
            d = {'a' * 115: 'b' * 256}
934 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
935 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
936 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
937 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 400)
938 3a19e99b Sofia Papagiannaki
939 3a19e99b Sofia Papagiannaki
            # Header value too large
940 3a19e99b Sofia Papagiannaki
            d = {'a' * 114: 'b' * 257}
941 3a19e99b Sofia Papagiannaki
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
942 3a19e99b Sofia Papagiannaki
                          k, v in d.items())
943 3a19e99b Sofia Papagiannaki
            r = self.post(url, content_type='', **kwargs)
944 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 400)
945 3a19e99b Sofia Papagiannaki
946 3a19e99b Sofia Papagiannaki
#            # Check utf-8 meta
947 3a19e99b Sofia Papagiannaki
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
948 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
949 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
950 3a19e99b Sofia Papagiannaki
#            url = join_urls(self.pithos_path, self.user, self.container,
951 3a19e99b Sofia Papagiannaki
#                            self.object)
952 3a19e99b Sofia Papagiannaki
#            r = self.post(url, content_type='', **kwargs)
953 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 202)
954 3a19e99b Sofia Papagiannaki
#
955 3a19e99b Sofia Papagiannaki
#            # assert metadata have been updated
956 3a19e99b Sofia Papagiannaki
#            meta = self.get_object_meta(self.container, self.object)
957 3a19e99b Sofia Papagiannaki
#
958 3a19e99b Sofia Papagiannaki
#            for k, v in d.items():
959 3a19e99b Sofia Papagiannaki
#                key = 'X-Object-Meta-%s' % k.title()
960 3a19e99b Sofia Papagiannaki
#                self.assertTrue(key in meta)
961 3a19e99b Sofia Papagiannaki
#                self.assertTrue(meta[key], v)
962 3a19e99b Sofia Papagiannaki
#
963 3a19e99b Sofia Papagiannaki
#            # Header key too large
964 3a19e99b Sofia Papagiannaki
#            d = {'α' * 114: 'β' * (256 / 2)}
965 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
966 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
967 3a19e99b Sofia Papagiannaki
#            r = self.post(url, content_type='', **kwargs)
968 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 400)
969 3a19e99b Sofia Papagiannaki
#
970 3a19e99b Sofia Papagiannaki
#            # Header value too large
971 3a19e99b Sofia Papagiannaki
#            d = {'α' * 114: 'β' * 256}
972 3a19e99b Sofia Papagiannaki
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
973 3a19e99b Sofia Papagiannaki
#                          k, v in d.items())
974 3a19e99b Sofia Papagiannaki
#            r = self.udpate(url, content_type='', **kwargs)
975 3a19e99b Sofia Papagiannaki
#            self.assertEqual(r.status_code, 400)
976 3a19e99b Sofia Papagiannaki
977 3a19e99b Sofia Papagiannaki
    def test_update_object(self):
978 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
979 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
980 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(
981 3a19e99b Sofia Papagiannaki
                block_size + 1, 2 * block_size))[:2]
982 3a19e99b Sofia Papagiannaki
983 3a19e99b Sofia Papagiannaki
        length = len(odata)
984 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
985 3a19e99b Sofia Papagiannaki
        last_byte_pos = random.randint(block_size + 1, length - 1)
986 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
987 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
988 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
989 3a19e99b Sofia Papagiannaki
990 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
991 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
992 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
993 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
994 3a19e99b Sofia Papagiannaki
995 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
996 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
997 3a19e99b Sofia Papagiannaki
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
998 3a19e99b Sofia Papagiannaki
                                     data)
999 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
1000 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(updated_data)
1001 3a19e99b Sofia Papagiannaki
        else:
1002 3a19e99b Sofia Papagiannaki
            etag = merkle(updated_data)
1003 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r['ETag'], etag)
1004 3a19e99b Sofia Papagiannaki
1005 3a19e99b Sofia Papagiannaki
        # check modified object
1006 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1007 3a19e99b Sofia Papagiannaki
1008 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1009 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, updated_data)
1010 3a19e99b Sofia Papagiannaki
        self.assertEqual(etag, r['ETag'])
1011 3a19e99b Sofia Papagiannaki
1012 3a19e99b Sofia Papagiannaki
    def test_update_object_divided_by_blocksize(self):
1013 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1014 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(self.container,
1015 3a19e99b Sofia Papagiannaki
                                          length=2 * block_size)[:2]
1016 3a19e99b Sofia Papagiannaki
1017 3a19e99b Sofia Papagiannaki
        length = len(odata)
1018 3a19e99b Sofia Papagiannaki
        first_byte_pos = block_size
1019 3a19e99b Sofia Papagiannaki
        last_byte_pos = 2 * block_size - 1
1020 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1021 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1022 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1023 3a19e99b Sofia Papagiannaki
1024 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1025 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1026 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1027 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1028 3a19e99b Sofia Papagiannaki
1029 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1030 3a19e99b Sofia Papagiannaki
        self.assertTrue('ETag' in r)
1031 3a19e99b Sofia Papagiannaki
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1032 3a19e99b Sofia Papagiannaki
                                     data)
1033 3a19e99b Sofia Papagiannaki
        if pithos_settings.UPDATE_MD5:
1034 5fe43b8c Sofia Papagiannaki
            etag = md5_hash(updated_data)
1035 3a19e99b Sofia Papagiannaki
        else:
1036 3a19e99b Sofia Papagiannaki
            etag = merkle(updated_data)
1037 3a19e99b Sofia Papagiannaki
        #self.assertEqual(r['ETag'], etag)
1038 3a19e99b Sofia Papagiannaki
1039 3a19e99b Sofia Papagiannaki
        # check modified object
1040 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1041 3a19e99b Sofia Papagiannaki
1042 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)
1043 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.content, updated_data)
1044 3a19e99b Sofia Papagiannaki
        self.assertEqual(etag, r['ETag'])
1045 3a19e99b Sofia Papagiannaki
1046 3a19e99b Sofia Papagiannaki
    def test_update_object_invalid_content_length(self):
1047 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1048 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1049 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(
1050 3a19e99b Sofia Papagiannaki
                block_size + 1, 2 * block_size))[:2]
1051 3a19e99b Sofia Papagiannaki
1052 3a19e99b Sofia Papagiannaki
        length = len(odata)
1053 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1054 3a19e99b Sofia Papagiannaki
        last_byte_pos = random.randint(block_size + 1, length - 1)
1055 3a19e99b Sofia Papagiannaki
        partial = last_byte_pos - first_byte_pos + 1
1056 3a19e99b Sofia Papagiannaki
        data = get_random_data(partial)
1057 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1058 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1059 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range,
1060 6ee6677e Sofia Papagiannaki
                  'CONTENT_LENGTH': str(partial + 1)}
1061 3a19e99b Sofia Papagiannaki
1062 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1063 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, **kwargs)
1064 3a19e99b Sofia Papagiannaki
1065 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1066 3a19e99b Sofia Papagiannaki
1067 3a19e99b Sofia Papagiannaki
    def test_update_object_invalid_range(self):
1068 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1069 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1070 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(block_size + 1,
1071 3a19e99b Sofia Papagiannaki
                                                  2 * block_size))[:2]
1072 3a19e99b Sofia Papagiannaki
1073 3a19e99b Sofia Papagiannaki
        length = len(odata)
1074 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1075 3a19e99b Sofia Papagiannaki
        last_byte_pos = first_byte_pos - 1
1076 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1077 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1078 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1079 3a19e99b Sofia Papagiannaki
1080 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1081 5fe43b8c Sofia Papagiannaki
        r = self.post(url, data=get_random_data(), **kwargs)
1082 3a19e99b Sofia Papagiannaki
1083 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
1084 3a19e99b Sofia Papagiannaki
1085 3a19e99b Sofia Papagiannaki
    def test_update_object_out_of_limits(self):
1086 3a19e99b Sofia Papagiannaki
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1087 3a19e99b Sofia Papagiannaki
        oname, odata = self.upload_object(
1088 3a19e99b Sofia Papagiannaki
            self.container, length=random.randint(block_size + 1,
1089 3a19e99b Sofia Papagiannaki
                                                  2 * block_size))[:2]
1090 3a19e99b Sofia Papagiannaki
1091 3a19e99b Sofia Papagiannaki
        length = len(odata)
1092 3a19e99b Sofia Papagiannaki
        first_byte_pos = random.randint(1, block_size)
1093 3a19e99b Sofia Papagiannaki
        last_byte_pos = length + 1
1094 3a19e99b Sofia Papagiannaki
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1095 3a19e99b Sofia Papagiannaki
        kwargs = {'content_type': 'application/octet-stream',
1096 3a19e99b Sofia Papagiannaki
                  'HTTP_CONTENT_RANGE': range}
1097 3a19e99b Sofia Papagiannaki
1098 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1099 5fe43b8c Sofia Papagiannaki
        r = self.post(url, data=get_random_data(), **kwargs)
1100 3a19e99b Sofia Papagiannaki
1101 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 416)
1102 3a19e99b Sofia Papagiannaki
1103 3a19e99b Sofia Papagiannaki
    def test_append(self):
1104 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
1105 5fe43b8c Sofia Papagiannaki
        length = len(data)
1106 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1107 3a19e99b Sofia Papagiannaki
                        self.object)
1108 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, content_type='application/octet-stream',
1109 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_LENGTH=str(length),
1110 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*')
1111 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1112 3a19e99b Sofia Papagiannaki
1113 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1114 3a19e99b Sofia Papagiannaki
        content = r.content
1115 3a19e99b Sofia Papagiannaki
        self.assertEqual(len(content), len(self.object_data) + length)
1116 3a19e99b Sofia Papagiannaki
        self.assertEqual(content, self.object_data + data)
1117 3a19e99b Sofia Papagiannaki
1118 5fe43b8c Sofia Papagiannaki
    # TODO Fix the test
1119 5fe43b8c Sofia Papagiannaki
    def _test_update_with_chunked_transfer(self):
1120 5fe43b8c Sofia Papagiannaki
        data = get_random_data()
1121 5fe43b8c Sofia Papagiannaki
        length = len(data)
1122 3a19e99b Sofia Papagiannaki
1123 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1124 3a19e99b Sofia Papagiannaki
                        self.object)
1125 3a19e99b Sofia Papagiannaki
        r = self.post(url, data=data, content_type='application/octet-stream',
1126 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1127 3a19e99b Sofia Papagiannaki
                      HTTP_TRANSFER_ENCODING='chunked')
1128 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1129 3a19e99b Sofia Papagiannaki
1130 3a19e99b Sofia Papagiannaki
        # check modified object
1131 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1132 3a19e99b Sofia Papagiannaki
        content = r.content
1133 3a19e99b Sofia Papagiannaki
        self.assertEqual(content[0:length], data)
1134 3a19e99b Sofia Papagiannaki
        self.assertEqual(content[length:], self.object_data[length:])
1135 3a19e99b Sofia Papagiannaki
1136 3a19e99b Sofia Papagiannaki
    def test_update_from_other_object(self):
1137 3a19e99b Sofia Papagiannaki
        src = self.object
1138 bc4f25c0 Sofia Papagiannaki
        dest = get_random_name()
1139 3a19e99b Sofia Papagiannaki
1140 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1141 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1142 3a19e99b Sofia Papagiannaki
        source_data = r.content
1143 3a19e99b Sofia Papagiannaki
        source_meta = self.get_object_info(self.container, src)
1144 3a19e99b Sofia Papagiannaki
1145 3a19e99b Sofia Papagiannaki
        # update zero length object
1146 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1147 3a19e99b Sofia Papagiannaki
        r = self.put(url, data='')
1148 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1149 3a19e99b Sofia Papagiannaki
1150 3a19e99b Sofia Papagiannaki
        r = self.post(url,
1151 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes */*',
1152 3a19e99b Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1153 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1154 3a19e99b Sofia Papagiannaki
1155 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1156 3a19e99b Sofia Papagiannaki
        dest_data = r.content
1157 3a19e99b Sofia Papagiannaki
        dest_meta = self.get_object_info(self.container, dest)
1158 3a19e99b Sofia Papagiannaki
1159 3a19e99b Sofia Papagiannaki
        self.assertEqual(source_data, dest_data)
1160 3a19e99b Sofia Papagiannaki
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1161 3a19e99b Sofia Papagiannaki
        self.assertEqual(source_meta['X-Object-Hash'],
1162 3a19e99b Sofia Papagiannaki
                         dest_meta['X-Object-Hash'])
1163 3a19e99b Sofia Papagiannaki
        self.assertTrue(
1164 3a19e99b Sofia Papagiannaki
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1165 3a19e99b Sofia Papagiannaki
1166 3a19e99b Sofia Papagiannaki
    def test_update_range_from_other_object(self):
1167 3a19e99b Sofia Papagiannaki
        src = self.object
1168 bc4f25c0 Sofia Papagiannaki
        dest = get_random_name()
1169 3a19e99b Sofia Papagiannaki
1170 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1171 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1172 3a19e99b Sofia Papagiannaki
        source_data = r.content
1173 3a19e99b Sofia Papagiannaki
1174 3a19e99b Sofia Papagiannaki
        # update zero length object
1175 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1176 5fe43b8c Sofia Papagiannaki
        initial_data = get_random_data()
1177 5fe43b8c Sofia Papagiannaki
        length = len(initial_data)
1178 a1f429b2 Sofia Papagiannaki
        r = self.put(url, data=initial_data)
1179 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1180 3a19e99b Sofia Papagiannaki
1181 3a19e99b Sofia Papagiannaki
        offset = random.randint(1, length - 2)
1182 3a19e99b Sofia Papagiannaki
        upto = random.randint(offset, length - 1)
1183 3a19e99b Sofia Papagiannaki
        r = self.post(url,
1184 3a19e99b Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1185 3a19e99b Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1186 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1187 3a19e99b Sofia Papagiannaki
1188 3a19e99b Sofia Papagiannaki
        r = self.get(url)
1189 3a19e99b Sofia Papagiannaki
        content = r.content
1190 a1f429b2 Sofia Papagiannaki
        self.assertEqual(content, (initial_data[:offset] +
1191 a1f429b2 Sofia Papagiannaki
                                   source_data[:upto - offset + 1] +
1192 a1f429b2 Sofia Papagiannaki
                                   initial_data[upto + 1:]))
1193 3a19e99b Sofia Papagiannaki
1194 e4c7cbeb Sofia Papagiannaki
    def test_update_range_from_invalid_other_object(self):
1195 e4c7cbeb Sofia Papagiannaki
        src = self.object
1196 e4c7cbeb Sofia Papagiannaki
        dest = get_random_name()
1197 e4c7cbeb Sofia Papagiannaki
1198 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, src)
1199 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1200 e4c7cbeb Sofia Papagiannaki
1201 e4c7cbeb Sofia Papagiannaki
        # update zero length object
1202 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1203 e4c7cbeb Sofia Papagiannaki
        initial_data = get_random_data()
1204 e4c7cbeb Sofia Papagiannaki
        length = len(initial_data)
1205 e4c7cbeb Sofia Papagiannaki
        r = self.put(url, data=initial_data)
1206 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
1207 e4c7cbeb Sofia Papagiannaki
1208 e4c7cbeb Sofia Papagiannaki
        offset = random.randint(1, length - 2)
1209 e4c7cbeb Sofia Papagiannaki
        upto = random.randint(offset, length - 1)
1210 e4c7cbeb Sofia Papagiannaki
1211 e4c7cbeb Sofia Papagiannaki
        # source object does not start with /
1212 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1213 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1214 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1215 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 400)
1216 e4c7cbeb Sofia Papagiannaki
1217 e4c7cbeb Sofia Papagiannaki
        # source object does not exist
1218 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1219 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1220 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1221 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1222 e4c7cbeb Sofia Papagiannaki
1223 00064fb8 Sofia Papagiannaki
    def test_restore_version(self):
1224 e4c7cbeb Sofia Papagiannaki
        info = self.get_object_info(self.container, self.object)
1225 00064fb8 Sofia Papagiannaki
        v = []
1226 00064fb8 Sofia Papagiannaki
        append = v.append
1227 00064fb8 Sofia Papagiannaki
        append((info['X-Object-Version'],
1228 00064fb8 Sofia Papagiannaki
                int(info['Content-Length']),
1229 00064fb8 Sofia Papagiannaki
                self.object_data))
1230 e4c7cbeb Sofia Papagiannaki
1231 e4c7cbeb Sofia Papagiannaki
        # update object
1232 00064fb8 Sofia Papagiannaki
        data, r = self.upload_object(self.container, self.object,
1233 00064fb8 Sofia Papagiannaki
                                     length=v[0][1] - 1)[1:]
1234 e4c7cbeb Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1235 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1236 00064fb8 Sofia Papagiannaki
        # v[0][1] > v[1][1]
1237 e4c7cbeb Sofia Papagiannaki
1238 00064fb8 Sofia Papagiannaki
        # update with the previous version
1239 e4c7cbeb Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1240 e4c7cbeb Sofia Papagiannaki
                        self.object)
1241 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1242 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1243 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1244 e4c7cbeb Sofia Papagiannaki
                                                       self.object),
1245 00064fb8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=v[0][0],
1246 00064fb8 Sofia Papagiannaki
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1247 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1248 00064fb8 Sofia Papagiannaki
        # v[2][1] = v[0][1] > v[1][1]
1249 e4c7cbeb Sofia Papagiannaki
1250 e4c7cbeb Sofia Papagiannaki
        # check content
1251 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1252 e4c7cbeb Sofia Papagiannaki
        content = r.content
1253 00064fb8 Sofia Papagiannaki
        self.assertEqual(len(content), v[0][1])
1254 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(content, self.object_data)
1255 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(content), content))
1256 e4c7cbeb Sofia Papagiannaki
1257 00064fb8 Sofia Papagiannaki
        # update object content(v4) > content(v2)
1258 00064fb8 Sofia Papagiannaki
        data, r = self.upload_object(self.container, self.object,
1259 00064fb8 Sofia Papagiannaki
                                     length=v[2][1] + 1)[1:]
1260 e4c7cbeb Sofia Papagiannaki
        self.assertTrue('X-Object-Version' in r)
1261 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1262 00064fb8 Sofia Papagiannaki
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1263 e4c7cbeb Sofia Papagiannaki
1264 e4c7cbeb Sofia Papagiannaki
        # update with the previous version
1265 e4c7cbeb Sofia Papagiannaki
        r = self.post(url,
1266 e4c7cbeb Sofia Papagiannaki
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1267 e4c7cbeb Sofia Papagiannaki
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1268 e4c7cbeb Sofia Papagiannaki
                                                       self.object),
1269 00064fb8 Sofia Papagiannaki
                      HTTP_X_SOURCE_VERSION=v[2][0],
1270 00064fb8 Sofia Papagiannaki
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1271 e4c7cbeb Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1272 00064fb8 Sofia Papagiannaki
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1273 e4c7cbeb Sofia Papagiannaki
1274 e4c7cbeb Sofia Papagiannaki
        # check content
1275 e4c7cbeb Sofia Papagiannaki
        r = self.get(url)
1276 00064fb8 Sofia Papagiannaki
        data = r.content
1277 00064fb8 Sofia Papagiannaki
        self.assertEqual(data, v[2][2])
1278 00064fb8 Sofia Papagiannaki
        append((r['X-Object-Version'], len(data), data))
1279 e4c7cbeb Sofia Papagiannaki
1280 3a19e99b Sofia Papagiannaki
1281 3a19e99b Sofia Papagiannaki
class ObjectDelete(PithosAPITest):
1282 3a19e99b Sofia Papagiannaki
    def setUp(self):
1283 3a19e99b Sofia Papagiannaki
        PithosAPITest.setUp(self)
1284 3a19e99b Sofia Papagiannaki
        self.container = 'c1'
1285 3a19e99b Sofia Papagiannaki
        self.create_container(self.container)
1286 3a19e99b Sofia Papagiannaki
        self.object, self.object_data = self.upload_object(self.container)[:2]
1287 3a19e99b Sofia Papagiannaki
1288 3a19e99b Sofia Papagiannaki
    def test_delete(self):
1289 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1290 3a19e99b Sofia Papagiannaki
                        self.object)
1291 3a19e99b Sofia Papagiannaki
        r = self.delete(url)
1292 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1293 3a19e99b Sofia Papagiannaki
1294 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1295 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1296 3a19e99b Sofia Papagiannaki
1297 3a19e99b Sofia Papagiannaki
    def test_delete_non_existent(self):
1298 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container,
1299 bc4f25c0 Sofia Papagiannaki
                        get_random_name())
1300 3a19e99b Sofia Papagiannaki
        r = self.delete(url)
1301 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 404)
1302 3a19e99b Sofia Papagiannaki
1303 3a19e99b Sofia Papagiannaki
    def test_delete_dir(self):
1304 3a19e99b Sofia Papagiannaki
        folder = self.create_folder(self.container)[0]
1305 3a19e99b Sofia Papagiannaki
        subfolder = self.create_folder(
1306 bc4f25c0 Sofia Papagiannaki
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1307 3a19e99b Sofia Papagiannaki
        objects = [subfolder]
1308 3a19e99b Sofia Papagiannaki
        append = objects.append
1309 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
1310 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (folder, get_random_name()),
1311 6ee6677e Sofia Papagiannaki
                                  depth='1')[0])
1312 3a19e99b Sofia Papagiannaki
        append(self.upload_object(self.container,
1313 bc4f25c0 Sofia Papagiannaki
                                  '%s/%s' % (subfolder, get_random_name()),
1314 6ee6677e Sofia Papagiannaki
                                  depth='2')[0])
1315 3a19e99b Sofia Papagiannaki
        other = self.upload_object(self.container, strnextling(folder))[0]
1316 3a19e99b Sofia Papagiannaki
1317 3a19e99b Sofia Papagiannaki
        # move dir
1318 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1319 3a19e99b Sofia Papagiannaki
        r = self.delete('%s?delimiter=/' % url)
1320 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
1321 3a19e99b Sofia Papagiannaki
1322 3a19e99b Sofia Papagiannaki
        for obj in objects:
1323 3a19e99b Sofia Papagiannaki
            # assert object does not exist
1324 3a19e99b Sofia Papagiannaki
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1325 3a19e99b Sofia Papagiannaki
            r = self.head(url)
1326 3a19e99b Sofia Papagiannaki
            self.assertEqual(r.status_code, 404)
1327 3a19e99b Sofia Papagiannaki
1328 3a19e99b Sofia Papagiannaki
        # assert other has not been deleted
1329 3a19e99b Sofia Papagiannaki
        url = join_urls(self.pithos_path, self.user, self.container, other)
1330 3a19e99b Sofia Papagiannaki
        r = self.head(url)
1331 3a19e99b Sofia Papagiannaki
        self.assertEqual(r.status_code, 200)