Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 6c9df07c

History | View | Annotate | Download (74.8 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from collections import defaultdict
38
from urllib import quote, unquote
39
from functools import partial
40

    
41
from pithos.api.test import (PithosAPITest, pithos_settings,
42
                             AssertMappingInvariant, AssertUUidInvariant,
43
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44
                             DATE_FORMATS)
45
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46
                                  get_random_data, get_random_name)
47

    
48
from synnefo.lib import join_urls
49

    
50
import django.utils.simplejson as json
51

    
52
import random
53
import re
54
import datetime
55
import time as _time
56

    
57
merkle = partial(merkle,
58
                 blocksize=TEST_BLOCK_SIZE,
59
                 blockhash=TEST_HASH_ALGORITHM)
60

    
61

    
62
class ObjectHead(PithosAPITest):
63
    def test_get_object_meta(self):
64
        cname = self.create_container()[0]
65
        oname, odata = self.upload_object(cname)[:-1]
66

    
67
        url = join_urls(self.pithos_path, self.user, cname, oname)
68
        r = self.head(url)
69

    
70
        mandatory = ['Etag',
71
                     'Content-Length',
72
                     'Content-Type',
73
                     'Last-Modified',
74
                     'X-Object-Hash',
75
                     'X-Object-UUID',
76
                     'X-Object-Version',
77
                     'X-Object-Version-Timestamp',
78
                     'X-Object-Modified-By']
79
        for i in mandatory:
80
            self.assertTrue(i in r)
81

    
82
        r = self.post(url, content_type='',
83
                      HTTP_CONTENT_ENCODING='gzip',
84
                      HTTP_CONTENT_DISPOSITION=(
85
                          'attachment; filename="%s"' % oname))
86
        self.assertEqual(r.status_code, 202)
87

    
88
        r = self.head(url)
89
        for i in mandatory:
90
            self.assertTrue(i in r)
91
        self.assertTrue('Content-Encoding' in r)
92
        self.assertEqual(r['Content-Encoding'], 'gzip')
93
        self.assertTrue('Content-Disposition' in r)
94
        self.assertEqual(unquote(r['Content-Disposition']),
95
                         'attachment; filename="%s"' % oname)
96

    
97
        prefix = 'myobject/'
98
        data = ''
99
        for i in range(random.randint(2, 10)):
100
            part = '%s%d' % (prefix, i)
101
            data += self.upload_object(cname, oname=part)[1]
102

    
103
        manifest = '%s/%s' % (cname, prefix)
104
        oname = get_random_name()
105
        url = join_urls(self.pithos_path, self.user, cname, oname)
106
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
107
        self.assertEqual(r.status_code, 201)
108

    
109
        r = self.head(url)
110
        for i in mandatory:
111
            self.assertTrue(i in r)
112
        self.assertTrue('X-Object-Manifest' in r)
113
        self.assertEqual(r['X-Object-Manifest'], manifest)
114

    
115

    
116
class ObjectGet(PithosAPITest):
117
    def setUp(self):
118
        PithosAPITest.setUp(self)
119
        self.containers = ['c1', 'c2']
120

    
121
        # create some containers
122
        for c in self.containers:
123
            self.create_container(c)
124

    
125
        # upload files
126
        self.objects = defaultdict(list)
127
        self.objects['c1'].append(self.upload_object('c1')[0])
128

    
129
    def test_versions(self):
130
        c = 'c1'
131
        o = self.objects[c][0]
132
        url = join_urls(self.pithos_path, self.user, c, o)
133

    
134
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
135
        r = self.post(url, content_type='', **meta)
136
        self.assertEqual(r.status_code, 202)
137

    
138
        url = join_urls(self.pithos_path, self.user, c, o)
139
        r = self.get('%s?version=list&format=json' % url)
140
        self.assertEqual(r.status_code, 200)
141
        l1 = json.loads(r.content)['versions']
142
        self.assertEqual(len(l1), 2)
143

    
144
        # update meta
145
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
146
                'HTTP_X_OBJECT_META_STOCK': 'True'}
147
        r = self.post(url, content_type='', **meta)
148
        self.assertEqual(r.status_code, 202)
149

    
150
        # assert a newly created version has been created
151
        r = self.get('%s?version=list&format=json' % url)
152
        self.assertEqual(r.status_code, 200)
153
        l2 = json.loads(r.content)['versions']
154
        self.assertEqual(len(l2), len(l1) + 1)
155
        self.assertEqual(l2[:-1], l1)
156

    
157
        vserial, _ = l2[-2]
158
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
159
                         {'Quality': 'AAA'})
160

    
161
        # update data
162
        self.append_object_data(c, o)
163

    
164
        # assert a newly created version has been created
165
        r = self.get('%s?version=list&format=json' % url)
166
        self.assertEqual(r.status_code, 200)
167
        l3 = json.loads(r.content)['versions']
168
        self.assertEqual(len(l3), len(l2) + 1)
169
        self.assertEqual(l3[:-1], l2)
170

    
171
    def test_get_version(self):
172
        c = 'c1'
173
        o = self.objects[c][0]
174
        url = join_urls(self.pithos_path, self.user, c, o)
175

    
176
        # Update metadata
177
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
178
        r = self.post(url, content_type='', **meta)
179
        self.assertEqual(r.status_code, 202)
180

    
181
        url = join_urls(self.pithos_path, self.user, c, o)
182
        r = self.get('%s?version=list&format=json' % url)
183
        self.assertEqual(r.status_code, 200)
184
        l = json.loads(r.content)['versions']
185
        self.assertEqual(len(l), 2)
186

    
187
        r = self.head('%s?version=%s' % (url, l[0][0]))
188
        self.assertEqual(r.status_code, 200)
189
        self.assertTrue('X-Object-Meta-Quality' not in r)
190

    
191
        r = self.head('%s?version=%s' % (url, l[1][0]))
192
        self.assertEqual(r.status_code, 200)
193
        self.assertTrue('X-Object-Meta-Quality' in r)
194

    
195
        # test invalid version
196
        r = self.head('%s?version=-1' % url)
197
        self.assertEqual(r.status_code, 404)
198

    
199
        other_name, other_data, r = self.upload_object(c)
200
        self.assertTrue('X-Object-Version' in r)
201
        other_version = r['X-Object-Version']
202

    
203
        self.assertTrue(o != other_name)
204

    
205
        r = self.get('%s?version=%s' % (url, other_version))
206
        self.assertEqual(r.status_code, 404)
207

    
208
        r = self.head('%s?version=%s' % (url, other_version))
209
        self.assertEqual(r.status_code, 404)
210

    
211
    def test_objects_with_trailing_spaces(self):
212
        # create object
213
        oname = self.upload_object('c1')[0]
214
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
215

    
216
        r = self.get(quote('%s ' % url))
217
        self.assertEqual(r.status_code, 404)
218

    
219
        # delete object
220
        self.delete(url)
221

    
222
        r = self.get(url)
223
        self.assertEqual(r.status_code, 404)
224

    
225
        # upload object with trailing space
226
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
227

    
228
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
229
        r = self.get(url)
230
        self.assertEqual(r.status_code, 200)
231

    
232
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
233
        r = self.get(url)
234
        self.assertEqual(r.status_code, 404)
235

    
236
    def test_get_partial(self):
237
        cname = self.containers[0]
238
        oname, odata = self.upload_object(cname, length=512)[:-1]
239
        url = join_urls(self.pithos_path, self.user, cname, oname)
240
        r = self.get(url, HTTP_RANGE='bytes=0-499')
241
        self.assertEqual(r.status_code, 206)
242
        data = r.content
243
        self.assertEqual(data, odata[:500])
244
        self.assertTrue('Content-Range' in r)
245
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
246
        self.assertTrue('Content-Type' in r)
247
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
248

    
249
    def test_get_final_500(self):
250
        cname = self.containers[0]
251
        oname, odata = self.upload_object(cname, length=512)[:-1]
252
        size = len(odata)
253
        url = join_urls(self.pithos_path, self.user, cname, oname)
254
        r = self.get(url, HTTP_RANGE='bytes=-500')
255
        self.assertEqual(r.status_code, 206)
256
        self.assertEqual(r.content, odata[-500:])
257
        self.assertTrue('Content-Range' in r)
258
        self.assertEqual(r['Content-Range'],
259
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
260
        self.assertTrue('Content-Type' in r)
261
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
262

    
263
    def test_get_rest(self):
264
        cname = self.containers[0]
265
        oname, odata = self.upload_object(cname, length=512)[:-1]
266
        size = len(odata)
267
        url = join_urls(self.pithos_path, self.user, cname, oname)
268
        offset = len(odata) - random.randint(1, 512)
269
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
270
        self.assertEqual(r.status_code, 206)
271
        self.assertEqual(r.content, odata[offset:])
272
        self.assertTrue('Content-Range' in r)
273
        self.assertEqual(r['Content-Range'],
274
                         'bytes %s-%s/%s' % (offset, size - 1, size))
275
        self.assertTrue('Content-Type' in r)
276
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
277

    
278
    def test_get_range_not_satisfiable(self):
279
        cname = self.containers[0]
280
        oname, odata = self.upload_object(cname, length=512)[:-1]
281
        url = join_urls(self.pithos_path, self.user, cname, oname)
282

    
283
        # TODO
284
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
285
        #self.assertEqual(r.status_code, 416)
286

    
287
        offset = len(odata) + 1
288
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
289
        self.assertEqual(r.status_code, 416)
290

    
291
    def test_multiple_range(self):
292
        cname = self.containers[0]
293
        oname, odata = self.upload_object(cname)[:-1]
294
        url = join_urls(self.pithos_path, self.user, cname, oname)
295

    
296
        l = ['0-499', '-500', '1000-']
297
        ranges = 'bytes=%s' % ','.join(l)
298
        r = self.get(url, HTTP_RANGE=ranges)
299
        self.assertEqual(r.status_code, 206)
300
        self.assertTrue('content-type' in r)
301
        p = re.compile(
302
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
303
            re.I)
304
        m = p.match(r['content-type'])
305
        if m is None:
306
            self.fail('Invalid multiple range content type')
307
        boundary = m.groupdict()['boundary']
308
        cparts = r.content.split('--%s' % boundary)[1:-1]
309

    
310
        # assert content parts length
311
        self.assertEqual(len(cparts), len(l))
312

    
313
        # for each content part assert headers
314
        i = 0
315
        for cpart in cparts:
316
            content = cpart.split('\r\n')
317
            headers = content[1:3]
318
            content_range = headers[0].split(': ')
319
            self.assertEqual(content_range[0], 'Content-Range')
320

    
321
            r = l[i].split('-')
322
            if not r[0] and not r[1]:
323
                pass
324
            elif not r[0]:
325
                start = len(odata) - int(r[1])
326
                end = len(odata)
327
            elif not r[1]:
328
                start = int(r[0])
329
                end = len(odata)
330
            else:
331
                start = int(r[0])
332
                end = int(r[1]) + 1
333
            fdata = odata[start:end]
334
            sdata = '\r\n'.join(content[4:-1])
335
            self.assertEqual(len(fdata), len(sdata))
336
            self.assertEquals(fdata, sdata)
337
            i += 1
338

    
339
    def test_multiple_range_not_satisfiable(self):
340
        # perform get with multiple range
341
        cname = self.containers[0]
342
        oname, odata = self.upload_object(cname)[:-1]
343
        out_of_range = len(odata) + 1
344
        l = ['0-499', '-500', '%d-' % out_of_range]
345
        ranges = 'bytes=%s' % ','.join(l)
346
        url = join_urls(self.pithos_path, self.user, cname, oname)
347
        r = self.get(url, HTTP_RANGE=ranges)
348
        self.assertEqual(r.status_code, 416)
349

    
350
    def test_get_if_match(self):
351
        cname = self.containers[0]
352
        oname, odata = self.upload_object(cname)[:-1]
353

    
354
        # perform get with If-Match
355
        url = join_urls(self.pithos_path, self.user, cname, oname)
356

    
357
        if pithos_settings.UPDATE_MD5:
358
            etag = md5_hash(odata)
359
        else:
360
            etag = merkle(odata)
361

    
362
        r = self.get(url, HTTP_IF_MATCH=etag)
363

    
364
        # assert get success
365
        self.assertEqual(r.status_code, 200)
366

    
367
        # assert response content
368
        self.assertEqual(r.content, odata)
369

    
370
    def test_get_if_match_star(self):
371
        cname = self.containers[0]
372
        oname, odata = self.upload_object(cname)[:-1]
373

    
374
        # perform get with If-Match *
375
        url = join_urls(self.pithos_path, self.user, cname, oname)
376
        r = self.get(url, HTTP_IF_MATCH='*')
377

    
378
        # assert get success
379
        self.assertEqual(r.status_code, 200)
380

    
381
        # assert response content
382
        self.assertEqual(r.content, odata)
383

    
384
    def test_get_multiple_if_match(self):
385
        cname = self.containers[0]
386
        oname, odata = self.upload_object(cname)[:-1]
387

    
388
        # perform get with If-Match
389
        url = join_urls(self.pithos_path, self.user, cname, oname)
390

    
391
        if pithos_settings.UPDATE_MD5:
392
            etag = md5_hash(odata)
393
        else:
394
            etag = merkle(odata)
395

    
396
        quoted = lambda s: '"%s"' % s
397
        r = self.get(url, HTTP_IF_MATCH=','.join(
398
            [quoted(etag), quoted(get_random_data(64))]))
399

    
400
        # assert get success
401
        self.assertEqual(r.status_code, 200)
402

    
403
        # assert response content
404
        self.assertEqual(r.content, odata)
405

    
406
    def test_if_match_precondition_failed(self):
407
        cname = self.containers[0]
408
        oname, odata = self.upload_object(cname)[:-1]
409

    
410
        # perform get with If-Match
411
        url = join_urls(self.pithos_path, self.user, cname, oname)
412
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
413
        self.assertEqual(r.status_code, 412)
414

    
415
    def test_if_none_match(self):
416
        # upload object
417
        cname = self.containers[0]
418
        oname, odata = self.upload_object(cname)[:-1]
419

    
420
        if pithos_settings.UPDATE_MD5:
421
            etag = md5_hash(odata)
422
        else:
423
            etag = merkle(odata)
424

    
425
        # perform get with If-None-Match
426
        url = join_urls(self.pithos_path, self.user, cname, oname)
427
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
428

    
429
        # assert precondition_failed
430
        self.assertEqual(r.status_code, 304)
431

    
432
        # update object data
433
        r = self.append_object_data(cname, oname)[-1]
434
        self.assertTrue(etag != r['ETag'])
435

    
436
        # perform get with If-None-Match
437
        url = join_urls(self.pithos_path, self.user, cname, oname)
438
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
439

    
440
        # assert get success
441
        self.assertEqual(r.status_code, 200)
442

    
443
    def test_if_none_match_star(self):
444
        # upload object
445
        cname = self.containers[0]
446
        oname, odata = self.upload_object(cname)[:-1]
447

    
448
        # perform get with If-None-Match with star
449
        url = join_urls(self.pithos_path, self.user, cname, oname)
450
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
451

    
452
        self.assertEqual(r.status_code, 304)
453

    
454
    def test_if_modified_since(self):
455
        # upload object
456
        cname = self.containers[0]
457
        oname, odata = self.upload_object(cname)[:-1]
458
        object_info = self.get_object_info(cname, oname)
459
        last_modified = object_info['Last-Modified']
460
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
461
        t1_formats = map(t1.strftime, DATE_FORMATS)
462

    
463
        # Check not modified since
464
        url = join_urls(self.pithos_path, self.user, cname, oname)
465
        for t in t1_formats:
466
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
467
            self.assertEqual(r.status_code, 304)
468

    
469
        _time.sleep(1)
470

    
471
        # update object data
472
        appended_data = self.append_object_data(cname, oname)[1]
473

    
474
        # Check modified since
475
        url = join_urls(self.pithos_path, self.user, cname, oname)
476
        for t in t1_formats:
477
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
478
            self.assertEqual(r.status_code, 200)
479
            self.assertEqual(r.content, odata + appended_data)
480

    
481
    def test_if_modified_since_invalid_date(self):
482
        cname = self.containers[0]
483
        oname, odata = self.upload_object(cname)[:-1]
484
        url = join_urls(self.pithos_path, self.user, cname, oname)
485
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
486
        self.assertEqual(r.status_code, 200)
487
        self.assertEqual(r.content, odata)
488

    
489
    def test_if_not_modified_since(self):
490
        cname = self.containers[0]
491
        oname, odata = self.upload_object(cname)[:-1]
492
        url = join_urls(self.pithos_path, self.user, cname, oname)
493
        object_info = self.get_object_info(cname, oname)
494
        last_modified = object_info['Last-Modified']
495
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
496

    
497
        # Check unmodified
498
        t1 = t + datetime.timedelta(seconds=1)
499
        t1_formats = map(t1.strftime, DATE_FORMATS)
500
        for t in t1_formats:
501
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
502
            self.assertEqual(r.status_code, 200)
503
            self.assertEqual(r.content, odata)
504

    
505
        # modify object
506
        _time.sleep(2)
507
        self.append_object_data(cname, oname)
508

    
509
        object_info = self.get_object_info(cname, oname)
510
        last_modified = object_info['Last-Modified']
511
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
512
        t2 = t - datetime.timedelta(seconds=1)
513
        t2_formats = map(t2.strftime, DATE_FORMATS)
514

    
515
        # check modified
516
        for t in t2_formats:
517
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
518
            self.assertEqual(r.status_code, 412)
519

    
520
        # modify account: update object meta
521
        _time.sleep(1)
522
        self.update_object_meta(cname, oname, {'foo': 'bar'})
523

    
524
        object_info = self.get_object_info(cname, oname)
525
        last_modified = object_info['Last-Modified']
526
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
527
        t3 = t - datetime.timedelta(seconds=1)
528
        t3_formats = map(t3.strftime, DATE_FORMATS)
529

    
530
        # check modified
531
        for t in t3_formats:
532
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
533
            self.assertEqual(r.status_code, 412)
534

    
535
    def test_if_unmodified_since(self):
536
        cname = self.containers[0]
537
        oname, odata = self.upload_object(cname)[:-1]
538
        url = join_urls(self.pithos_path, self.user, cname, oname)
539
        object_info = self.get_object_info(cname, oname)
540
        last_modified = object_info['Last-Modified']
541
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
542
        t = t + datetime.timedelta(seconds=1)
543
        t_formats = map(t.strftime, DATE_FORMATS)
544

    
545
        for tf in t_formats:
546
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
547
            self.assertEqual(r.status_code, 200)
548
            self.assertEqual(r.content, odata)
549

    
550
    def test_if_unmodified_since_precondition_failed(self):
551
        cname = self.containers[0]
552
        oname, odata = self.upload_object(cname)[:-1]
553
        url = join_urls(self.pithos_path, self.user, cname, oname)
554
        object_info = self.get_object_info(cname, oname)
555
        last_modified = object_info['Last-Modified']
556
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
557
        t = t - datetime.timedelta(seconds=1)
558
        t_formats = map(t.strftime, DATE_FORMATS)
559

    
560
        for tf in t_formats:
561
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
562
            self.assertEqual(r.status_code, 412)
563

    
564
    def test_hashes(self):
565
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
566
        cname = self.containers[0]
567
        oname, odata = self.upload_object(cname, length=l)[:-1]
568
        size = len(odata)
569

    
570
        url = join_urls(self.pithos_path, self.user, cname, oname)
571
        r = self.get('%s?format=json&hashmap' % url)
572
        self.assertEqual(r.status_code, 200)
573
        body = json.loads(r.content)
574

    
575
        hashes = body['hashes']
576
        block_size = body['block_size']
577
        block_num = size / block_size if size / block_size == 0 else\
578
            size / block_size + 1
579
        self.assertTrue(len(hashes), block_num)
580
        i = 0
581
        for h in hashes:
582
            start = i * block_size
583
            end = (i + 1) * block_size
584
            hash = merkle(odata[start:end])
585
            self.assertEqual(h, hash)
586
            i += 1
587

    
588

    
589
class ObjectPut(PithosAPITest):
590
    def setUp(self):
591
        PithosAPITest.setUp(self)
592
        self.container = get_random_name()
593
        self.create_container(self.container)
594

    
595
    def test_upload(self):
596
        cname = self.container
597
        oname = get_random_name()
598
        data = get_random_data()
599
        meta = {'test': 'test1'}
600
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
601
                       for k, v in meta.iteritems())
602
        url = join_urls(self.pithos_path, self.user, cname, oname)
603
        r = self.put(url, data=data, content_type='application/pdf', **headers)
604
        self.assertEqual(r.status_code, 201)
605
        self.assertTrue('ETag' in r)
606
        self.assertTrue('X-Object-Version' in r)
607

    
608
        info = self.get_object_info(cname, oname)
609

    
610
        # assert object meta
611
        self.assertTrue('X-Object-Meta-Test' in info)
612
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
613

    
614
        # assert content-type
615
        self.assertEqual(info['content-type'], 'application/pdf')
616

    
617
        # assert uploaded content
618
        r = self.get(url)
619
        self.assertEqual(r.status_code, 200)
620
        self.assertEqual(r.content, data)
621

    
622
    def test_maximum_upload_size_exceeds(self):
623
        cname = self.container
624
        oname = get_random_name()
625

    
626
        # set container quota to 100
627
        url = join_urls(self.pithos_path, self.user, cname)
628
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
629
        self.assertEqual(r.status_code, 202)
630

    
631
        info = self.get_container_info(cname)
632
        length = int(info['X-Container-Policy-Quota']) + 1
633

    
634
        data = get_random_data(length)
635
        url = join_urls(self.pithos_path, self.user, cname, oname)
636
        r = self.put(url, data=data)
637
        self.assertEqual(r.status_code, 413)
638

    
639
    def test_upload_with_name_containing_slash(self):
640
        cname = self.container
641
        oname = '/%s' % get_random_name()
642
        data = get_random_data()
643
        url = join_urls(self.pithos_path, self.user, cname, oname)
644
        r = self.put(url, data=data)
645
        self.assertEqual(r.status_code, 201)
646
        self.assertTrue('ETag' in r)
647
        self.assertTrue('X-Object-Version' in r)
648

    
649
        r = self.get(url)
650
        self.assertEqual(r.status_code, 200)
651
        self.assertEqual(r.content, data)
652

    
653
    def test_upload_unprocessable_entity(self):
654
        cname = self.container
655
        oname = get_random_name()
656
        data = get_random_data()
657
        url = join_urls(self.pithos_path, self.user, cname, oname)
658
        r = self.put(url, data=data, HTTP_ETAG='123')
659
        self.assertEqual(r.status_code, 422)
660

    
661
#    def test_chunked_transfer(self):
662
#        cname = self.container
663
#        oname = '/%s' % get_random_name()
664
#        data = get_random_data()
665
#        url = join_urls(self.pithos_path, self.user, cname, oname)
666
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
667
#        self.assertEqual(r.status_code, 201)
668
#        self.assertTrue('ETag' in r)
669
#        self.assertTrue('X-Object-Version' in r)
670

    
671
    def test_manifestation(self):
672
        cname = self.container
673
        prefix = 'myobject/'
674
        data = ''
675
        for i in range(random.randint(2, 10)):
676
            part = '%s%d' % (prefix, i)
677
            data += self.upload_object(cname, oname=part)[1]
678

    
679
        manifest = '%s/%s' % (cname, prefix)
680
        oname = get_random_name()
681
        url = join_urls(self.pithos_path, self.user, cname, oname)
682
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
683
        self.assertEqual(r.status_code, 201)
684

    
685
        # assert object exists
686
        r = self.get(url)
687
        self.assertEqual(r.status_code, 200)
688

    
689
        # assert its content
690
        self.assertEqual(r.content, data)
691

    
692
        # invalid manifestation
693
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
694
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
695
        r = self.get(url)
696
        self.assertEqual(r.content, '')
697

    
698
    def test_create_zero_length_object(self):
699
        cname = self.container
700
        oname = get_random_name()
701
        url = join_urls(self.pithos_path, self.user, cname, oname)
702
        r = self.put(url, data='')
703
        self.assertEqual(r.status_code, 201)
704

    
705
        r = self.get(url)
706
        self.assertEqual(r.status_code, 200)
707
        self.assertEqual(int(r['Content-Length']), 0)
708
        self.assertEqual(r.content, '')
709

    
710
        r = self.get('%s?hashmap=&format=json' % url)
711
        self.assertEqual(r.status_code, 200)
712
        body = json.loads(r.content)
713
        hashes = body['hashes']
714
        hash = merkle('')
715
        self.assertEqual(hashes, [hash])
716

    
717
    def test_create_object_by_hashmap(self):
718
        cname = self.container
719
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
720

    
721
        # upload an object
722
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
723
        # get it hashmap
724
        url = join_urls(self.pithos_path, self.user, cname, oname)
725
        r = self.get('%s?hashmap=&format=json' % url)
726

    
727
        oname = get_random_name()
728
        url = join_urls(self.pithos_path, self.user, cname, oname)
729
        r = self.put('%s?hashmap=' % url, data=r.content)
730
        self.assertEqual(r.status_code, 201)
731

    
732
        r = self.get(url)
733
        self.assertEqual(r.status_code, 200)
734
        self.assertEqual(r.content, data)
735

    
736
    def test_create_object_by_invalid_hashmap(self):
737
        cname = self.container
738
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
739

    
740
        # upload an object
741
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
742
        # get it hashmap
743
        url = join_urls(self.pithos_path, self.user, cname, oname)
744
        r = self.get('%s?hashmap=&format=json' % url)
745
        data = r.content
746
        try:
747
            hashmap = json.loads(data)
748
        except:
749
            self.fail('JSON format expected')
750

    
751
        oname = get_random_name()
752
        url = join_urls(self.pithos_path, self.user, cname, oname)
753
        hashmap['hashes'] = [get_random_name()]
754
        r = self.put('%s?hashmap=' % url, data=json.dumps(hashmap))
755
        self.assertEqual(r.status_code, 400)
756

    
757

    
758
class ObjectPutCopy(PithosAPITest):
759
    def setUp(self):
760
        PithosAPITest.setUp(self)
761
        self.container = 'c1'
762
        self.create_container(self.container)
763
        self.object, self.data = self.upload_object(self.container)[:-1]
764

    
765
        url = join_urls(
766
            self.pithos_path, self.user, self.container, self.object)
767
        r = self.head(url)
768
        self.etag = r['X-Object-Hash']
769

    
770
    def test_copy(self):
771
        with AssertMappingInvariant(self.get_object_info, self.container,
772
                                    self.object):
773
            # copy object
774
            oname = get_random_name()
775
            url = join_urls(self.pithos_path, self.user, self.container, oname)
776
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
777
                         HTTP_X_COPY_FROM='/%s/%s' % (
778
                             self.container, self.object))
779

    
780
            # assert copy success
781
            self.assertEqual(r.status_code, 201)
782

    
783
            # assert access the new object
784
            r = self.head(url)
785
            self.assertEqual(r.status_code, 200)
786
            self.assertTrue('X-Object-Meta-Test' in r)
787
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
788

    
789
            # assert etag is the same
790
            self.assertTrue('X-Object-Hash' in r)
791
            self.assertEqual(r['X-Object-Hash'], self.etag)
792

    
793
    def test_copy_from_different_container(self):
794
        cname = 'c2'
795
        self.create_container(cname)
796
        with AssertMappingInvariant(self.get_object_info, self.container,
797
                                    self.object):
798
            oname = get_random_name()
799
            url = join_urls(self.pithos_path, self.user, cname, oname)
800
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
801
                         HTTP_X_COPY_FROM='/%s/%s' % (
802
                             self.container, self.object))
803

    
804
            # assert copy success
805
            self.assertEqual(r.status_code, 201)
806

    
807
            # assert access the new object
808
            r = self.head(url)
809
            self.assertEqual(r.status_code, 200)
810
            self.assertTrue('X-Object-Meta-Test' in r)
811
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
812

    
813
            # assert etag is the same
814
            self.assertTrue('X-Object-Hash' in r)
815
            self.assertEqual(r['X-Object-Hash'], self.etag)
816

    
817
    def test_copy_from_other_account(self):
818
        cname = 'c2'
819
        self.create_container(cname, user='chuck')
820
        self.create_container(cname, user='alice')
821

    
822
        # share object for read with alice
823
        url = join_urls(self.pithos_path, self.user, self.container,
824
                        self.object)
825
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
826
                      HTTP_X_OBJECT_SHARING='read=alice')
827
        self.assertEqual(r.status_code, 202)
828

    
829
        # assert not allowed for chuck
830
        oname = get_random_name()
831
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
832
        r = self.put(url, data='', user='chuck',
833
                     HTTP_X_OBJECT_META_TEST='testcopy',
834
                     HTTP_X_COPY_FROM='/%s/%s' % (
835
                         self.container, self.object),
836
                     HTTP_X_SOURCE_ACCOUNT='user')
837

    
838
        self.assertEqual(r.status_code, 403)
839

    
840
        # assert copy success for alice
841
        url = join_urls(self.pithos_path, 'alice', cname, oname)
842
        r = self.put(url, data='', user='alice',
843
                     HTTP_X_OBJECT_META_TEST='testcopy',
844
                     HTTP_X_COPY_FROM='/%s/%s' % (
845
                         self.container, self.object),
846
                     HTTP_X_SOURCE_ACCOUNT='user')
847
        self.assertEqual(r.status_code, 201)
848

    
849
        # assert access the new object
850
        r = self.head(url, user='alice')
851
        self.assertEqual(r.status_code, 200)
852
        self.assertTrue('X-Object-Meta-Test' in r)
853
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
854

    
855
        # assert etag is the same
856
        self.assertTrue('X-Object-Hash' in r)
857
        self.assertEqual(r['X-Object-Hash'], self.etag)
858

    
859
        # share object for write
860
        url = join_urls(self.pithos_path, self.user, self.container,
861
                        self.object)
862
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
863
                      HTTP_X_OBJECT_SHARING='write=dan')
864
        self.assertEqual(r.status_code, 202)
865

    
866
        # assert not allowed copy for alice
867
        url = join_urls(self.pithos_path, 'alice', cname, oname)
868
        r = self.put(url, data='', user='alice',
869
                     HTTP_X_OBJECT_META_TEST='testcopy',
870
                     HTTP_X_COPY_FROM='/%s/%s' % (
871
                         self.container, self.object),
872
                     HTTP_X_SOURCE_ACCOUNT='user')
873
        self.assertEqual(r.status_code, 403)
874

    
875
        # assert allowed copy for dan
876
        self.create_container(cname, user='dan')
877
        url = join_urls(self.pithos_path, 'dan', cname, oname)
878
        r = self.put(url, data='', user='dan',
879
                     HTTP_X_OBJECT_META_TEST='testcopy',
880
                     HTTP_X_COPY_FROM='/%s/%s' % (
881
                         self.container, self.object),
882
                     HTTP_X_SOURCE_ACCOUNT='user')
883
        self.assertEqual(r.status_code, 201)
884

    
885
        # assert access the new object
886
        r = self.head(url, user='dan')
887
        self.assertEqual(r.status_code, 200)
888
        self.assertTrue('X-Object-Meta-Test' in r)
889
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
890

    
891
        # assert etag is the same
892
        self.assertTrue('X-Object-Hash' in r)
893
        self.assertEqual(r['X-Object-Hash'], self.etag)
894

    
895
        # assert source object still exists
896
        url = join_urls(self.pithos_path, self.user, self.container,
897
                        self.object)
898
        r = self.head(url)
899
        self.assertEqual(r.status_code, 200)
900
        # assert etag is the same
901
        self.assertTrue('X-Object-Hash' in r)
902
        self.assertEqual(r['X-Object-Hash'], self.etag)
903

    
904
        r = self.get(url)
905
        self.assertEqual(r.status_code, 200)
906
        # assert etag is the same
907
        self.assertTrue('X-Object-Hash' in r)
908
        self.assertEqual(r['X-Object-Hash'], self.etag)
909

    
910
    def test_copy_invalid(self):
911
        # copy from non-existent object
912
        oname = get_random_name()
913
        url = join_urls(self.pithos_path, self.user, self.container, oname)
914
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
915
                     HTTP_X_COPY_FROM='/%s/%s' % (
916
                         self.container, get_random_name()))
917
        self.assertEqual(r.status_code, 404)
918

    
919
        # copy from non-existent container
920
        oname = get_random_name()
921
        url = join_urls(self.pithos_path, self.user, self.container, oname)
922
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
923
                     HTTP_X_COPY_FROM='/%s/%s' % (
924
                         get_random_name(), self.object))
925
        self.assertEqual(r.status_code, 404)
926

    
927
    def test_copy_dir(self):
928
        folder = self.create_folder(self.container)[0]
929
        subfolder = self.create_folder(
930
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
931
        objects = [subfolder]
932
        append = objects.append
933
        append(self.upload_object(self.container,
934
                                  '%s/%s' % (folder, get_random_name()),
935
                                  depth='1')[0])
936
        append(self.upload_object(self.container,
937
                                  '%s/%s' % (subfolder, get_random_name()),
938
                                  depth='2')[0])
939
        other = self.upload_object(self.container, strnextling(folder))[0]
940

    
941
        # copy dir
942
        copy_folder = self.create_folder(self.container)[0]
943
        url = join_urls(self.pithos_path, self.user, self.container,
944
                        copy_folder)
945
        r = self.put('%s?delimiter=/' % url, data='',
946
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
947
        self.assertEqual(r.status_code, 201)
948

    
949
        for obj in objects:
950
            # assert object exists
951
            url = join_urls(self.pithos_path, self.user, self.container,
952
                            obj.replace(folder, copy_folder))
953
            r = self.head(url)
954
            self.assertEqual(r.status_code, 200)
955

    
956
            # assert metadata
957
            meta = self.get_object_meta(self.container, obj)
958
            for k in meta.keys():
959
                key = 'X-Object-Meta-%s' % k
960
                self.assertTrue(key in r)
961
                self.assertEqual(r[key], meta[k])
962

    
963
        # assert other has not been created under copy folder
964
        url = join_urls(self.pithos_path, self.user, self.container,
965
                        '%s/%s' % (copy_folder,
966
                                   other.replace(folder, copy_folder)))
967
        r = self.head(url)
968
        self.assertEqual(r.status_code, 404)
969

    
970

    
971
class ObjectPutMove(PithosAPITest):
972
    def setUp(self):
973
        PithosAPITest.setUp(self)
974
        self.container = 'c1'
975
        self.create_container(self.container)
976
        self.object, self.data = self.upload_object(self.container)[:-1]
977

    
978
        url = join_urls(
979
            self.pithos_path, self.user, self.container, self.object)
980
        r = self.head(url)
981
        self.etag = r['X-Object-Hash']
982

    
983
    def test_move(self):
984
        # move object
985
        oname = get_random_name()
986
        url = join_urls(self.pithos_path, self.user, self.container, oname)
987
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
988
                     HTTP_X_MOVE_FROM='/%s/%s' % (
989
                         self.container, self.object))
990

    
991
        # assert move success
992
        self.assertEqual(r.status_code, 201)
993

    
994
        # assert access the new object
995
        r = self.head(url)
996
        self.assertEqual(r.status_code, 200)
997
        self.assertTrue('X-Object-Meta-Test' in r)
998
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
999

    
1000
        # assert etag is the same
1001
        self.assertTrue('X-Object-Hash' in r)
1002

    
1003
        # assert the initial object has been deleted
1004
        url = join_urls(self.pithos_path, self.user, self.container,
1005
                        self.object)
1006
        r = self.head(url)
1007
        self.assertEqual(r.status_code, 404)
1008

    
1009
    def test_move_dir(self):
1010
        folder = self.create_folder(self.container)[0]
1011
        subfolder = self.create_folder(
1012
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1013
        objects = [subfolder]
1014
        append = objects.append
1015
        append(self.upload_object(self.container,
1016
                                  '%s/%s' % (folder, get_random_name()),
1017
                                  depth='1')[0])
1018
        append(self.upload_object(self.container,
1019
                                  '%s/%s' % (subfolder, get_random_name()),
1020
                                  depth='1')[0])
1021
        other = self.upload_object(self.container, strnextling(folder))[0]
1022

    
1023
        # move dir
1024
        copy_folder = self.create_folder(self.container)[0]
1025
        url = join_urls(self.pithos_path, self.user, self.container,
1026
                        copy_folder)
1027
        r = self.put('%s?delimiter=/' % url, data='',
1028
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
1029
        self.assertEqual(r.status_code, 201)
1030

    
1031
        for obj in objects:
1032
            # assert initial object does not exist
1033
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1034
            r = self.head(url)
1035
            self.assertEqual(r.status_code, 404)
1036

    
1037
            # assert new object was created
1038
            url = join_urls(self.pithos_path, self.user, self.container,
1039
                            obj.replace(folder, copy_folder))
1040
            r = self.head(url)
1041
            self.assertEqual(r.status_code, 200)
1042

    
1043
        # assert other has not been created under copy folder
1044
        url = join_urls(self.pithos_path, self.user, self.container,
1045
                        '%s/%s' % (copy_folder,
1046
                                   other.replace(folder, copy_folder)))
1047
        r = self.head(url)
1048
        self.assertEqual(r.status_code, 404)
1049

    
1050
    def test_move_from_other_account(self):
1051
        cname = 'c2'
1052
        self.create_container(cname, user='chuck')
1053
        self.create_container(cname, user='alice')
1054

    
1055
        # share object for read with alice
1056
        url = join_urls(self.pithos_path, self.user, self.container,
1057
                        self.object)
1058
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1059
                      HTTP_X_OBJECT_SHARING='read=alice')
1060
        self.assertEqual(r.status_code, 202)
1061

    
1062
        # assert not allowed move for chuck
1063
        oname = get_random_name()
1064
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
1065
        r = self.put(url, data='', user='chuck',
1066
                     HTTP_X_OBJECT_META_TEST='testcopy',
1067
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1068
                         self.container, self.object),
1069
                     HTTP_X_SOURCE_ACCOUNT='user')
1070

    
1071
        self.assertEqual(r.status_code, 403)
1072

    
1073
        # assert no new object was created
1074
        r = self.head(url, user='chuck')
1075
        self.assertEqual(r.status_code, 404)
1076

    
1077
        # assert not allowed move for alice
1078
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1079
        r = self.put(url, data='', user='alice',
1080
                     HTTP_X_OBJECT_META_TEST='testcopy',
1081
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1082
                         self.container, self.object),
1083
                     HTTP_X_SOURCE_ACCOUNT='user')
1084
        self.assertEqual(r.status_code, 403)
1085

    
1086
        # assert no new object was created
1087
        r = self.head(url, user='alice')
1088
        self.assertEqual(r.status_code, 404)
1089

    
1090
        # share object for write with dan
1091
        url = join_urls(self.pithos_path, self.user, self.container,
1092
                        self.object)
1093
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1094
                      HTTP_X_OBJECT_SHARING='write=dan')
1095
        self.assertEqual(r.status_code, 202)
1096

    
1097
        # assert not allowed move for alice
1098
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1099
        r = self.put(url, data='', user='alice',
1100
                     HTTP_X_OBJECT_META_TEST='testcopy',
1101
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1102
                         self.container, self.object),
1103
                     HTTP_X_SOURCE_ACCOUNT='user')
1104
        self.assertEqual(r.status_code, 403)
1105

    
1106
        # assert no new object was created
1107
        r = self.head(url, user='alice')
1108
        self.assertEqual(r.status_code, 404)
1109

    
1110
        # assert not allowed move for dan
1111
        self.create_container(cname, user='dan')
1112
        url = join_urls(self.pithos_path, 'dan', cname, oname)
1113
        r = self.put(url, data='', user='dan',
1114
                     HTTP_X_OBJECT_META_TEST='testcopy',
1115
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1116
                         self.container, self.object),
1117
                     HTTP_X_SOURCE_ACCOUNT='user')
1118
        self.assertEqual(r.status_code, 403)
1119

    
1120
        # assert no new object was created
1121
        r = self.head(url, user='dan')
1122
        self.assertEqual(r.status_code, 404)
1123

    
1124

    
1125
class ObjectCopy(PithosAPITest):
1126
    def setUp(self):
1127
        PithosAPITest.setUp(self)
1128
        self.container = 'c1'
1129
        self.create_container(self.container)
1130
        self.object, self.data = self.upload_object(self.container)[:-1]
1131

    
1132
        url = join_urls(
1133
            self.pithos_path, self.user, self.container, self.object)
1134
        r = self.head(url)
1135
        self.etag = r['X-Object-Hash']
1136

    
1137
    def test_copy(self):
1138
        with AssertMappingInvariant(self.get_object_info, self.container,
1139
                                    self.object):
1140
            oname = get_random_name()
1141
            # copy object
1142
            url = join_urls(self.pithos_path, self.user, self.container,
1143
                            self.object)
1144
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1145
                          HTTP_DESTINATION='/%s/%s' % (self.container,
1146
                                                       oname))
1147
            # assert copy success
1148
            url = join_urls(self.pithos_path, self.user, self.container,
1149
                            oname)
1150
            self.assertEqual(r.status_code, 201)
1151

    
1152
            # assert access the new object
1153
            r = self.head(url)
1154
            self.assertEqual(r.status_code, 200)
1155
            self.assertTrue('X-Object-Meta-Test' in r)
1156
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1157

    
1158
            # assert etag is the same
1159
            self.assertTrue('X-Object-Hash' in r)
1160
            self.assertEqual(r['X-Object-Hash'], self.etag)
1161

    
1162
            # assert source object still exists
1163
            url = join_urls(self.pithos_path, self.user, self.container,
1164
                            self.object)
1165
            r = self.head(url)
1166
            self.assertEqual(r.status_code, 200)
1167

    
1168
            # assert etag is the same
1169
            self.assertTrue('X-Object-Hash' in r)
1170
            self.assertEqual(r['X-Object-Hash'], self.etag)
1171

    
1172
            r = self.get(url)
1173
            self.assertEqual(r.status_code, 200)
1174

    
1175
            # assert etag is the same
1176
            self.assertTrue('X-Object-Hash' in r)
1177
            self.assertEqual(r['X-Object-Hash'], self.etag)
1178

    
1179
            # copy object to other container (not existing)
1180
            cname = get_random_name()
1181
            url = join_urls(self.pithos_path, self.user, self.container,
1182
                            self.object)
1183
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1184
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1185

    
1186
            # assert destination container does not exist
1187
            url = join_urls(self.pithos_path, self.user, cname,
1188
                            self.object)
1189
            self.assertEqual(r.status_code, 404)
1190

    
1191
            # create container
1192
            self.create_container(cname)
1193

    
1194
            # copy object to other container (existing)
1195
            url = join_urls(self.pithos_path, self.user, self.container,
1196
                            self.object)
1197
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1198
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1199

    
1200
            # assert copy success
1201
            url = join_urls(self.pithos_path, self.user, cname,
1202
                            self.object)
1203
            self.assertEqual(r.status_code, 201)
1204

    
1205
            # assert access the new object
1206
            r = self.head(url)
1207
            self.assertEqual(r.status_code, 200)
1208
            self.assertTrue('X-Object-Meta-Test' in r)
1209
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1210

    
1211
            # assert etag is the same
1212
            self.assertTrue('X-Object-Hash' in r)
1213
            self.assertEqual(r['X-Object-Hash'], self.etag)
1214

    
1215
                        # assert source object still exists
1216
            url = join_urls(self.pithos_path, self.user, self.container,
1217
                            self.object)
1218
            r = self.head(url)
1219
            self.assertEqual(r.status_code, 200)
1220

    
1221
            # assert etag is the same
1222
            self.assertTrue('X-Object-Hash' in r)
1223
            self.assertEqual(r['X-Object-Hash'], self.etag)
1224

    
1225
            r = self.get(url)
1226
            self.assertEqual(r.status_code, 200)
1227

    
1228
            # assert etag is the same
1229
            self.assertTrue('X-Object-Hash' in r)
1230
            self.assertEqual(r['X-Object-Hash'], self.etag)
1231

    
1232
    def test_copy_to_other_account(self):
1233
        # create a container under alice account
1234
        cname = self.create_container(user='alice')[0]
1235

    
1236
        # create a folder under this container
1237
        folder = self.create_folder(cname, user='alice')[0]
1238

    
1239
        oname = get_random_name()
1240

    
1241
        # copy object to other account container
1242
        url = join_urls(self.pithos_path, self.user, self.container,
1243
                        self.object)
1244
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1245
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1246
                      HTTP_DESTINATION_ACCOUNT='alice')
1247
        self.assertEqual(r.status_code, 403)
1248

    
1249
        # share object for read with user
1250
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1251
        r = self.post(url, user='alice', content_type='',
1252
                      HTTP_CONTENT_RANGE='bytes */*',
1253
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1254
        self.assertEqual(r.status_code, 202)
1255

    
1256
        # assert copy object still is not allowed
1257
        url = join_urls(self.pithos_path, self.user, self.container,
1258
                        self.object)
1259
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1260
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1261
                      HTTP_DESTINATION_ACCOUNT='alice')
1262
        self.assertEqual(r.status_code, 403)
1263

    
1264
        # share object for write with user
1265
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1266
        r = self.post(url, user='alice',  content_type='',
1267
                      HTTP_CONTENT_RANGE='bytes */*',
1268
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1269
        self.assertEqual(r.status_code, 202)
1270

    
1271
        # assert copy object now is allowed
1272
        url = join_urls(self.pithos_path, self.user, self.container,
1273
                        self.object)
1274
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1275
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1276
                      HTTP_DESTINATION_ACCOUNT='alice')
1277
        self.assertEqual(r.status_code, 201)
1278

    
1279
        # assert access the new object
1280
        url = join_urls(self.pithos_path, 'alice', cname, folder, oname)
1281
        r = self.head(url, user='alice')
1282
        self.assertEqual(r.status_code, 200)
1283
        self.assertTrue('X-Object-Meta-Test' in r)
1284
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1285

    
1286
        # assert etag is the same
1287
        self.assertTrue('X-Object-Hash' in r)
1288
        self.assertEqual(r['X-Object-Hash'], self.etag)
1289

    
1290
        # assert source object still exists
1291
        url = join_urls(self.pithos_path, self.user, self.container,
1292
                        self.object)
1293
        r = self.head(url)
1294
        self.assertEqual(r.status_code, 200)
1295

    
1296
        # assert etag is the same
1297
        self.assertTrue('X-Object-Hash' in r)
1298
        self.assertEqual(r['X-Object-Hash'], self.etag)
1299

    
1300
        r = self.get(url)
1301
        self.assertEqual(r.status_code, 200)
1302

    
1303
        # assert etag is the same
1304
        self.assertTrue('X-Object-Hash' in r)
1305
        self.assertEqual(r['X-Object-Hash'], self.etag)
1306

    
1307

    
1308
class ObjectMove(PithosAPITest):
1309
    def setUp(self):
1310
        PithosAPITest.setUp(self)
1311
        self.container = 'c1'
1312
        self.create_container(self.container)
1313
        self.object, self.data = self.upload_object(self.container)[:-1]
1314

    
1315
        url = join_urls(
1316
            self.pithos_path, self.user, self.container, self.object)
1317
        r = self.head(url)
1318
        self.etag = r['X-Object-Hash']
1319

    
1320
    def test_move(self):
1321
        oname = get_random_name()
1322

    
1323
        # move object
1324
        url = join_urls(self.pithos_path, self.user, self.container,
1325
                        self.object)
1326
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1327
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1328
                                                   oname))
1329
        # assert move success
1330
        url = join_urls(self.pithos_path, self.user, self.container,
1331
                        oname)
1332
        self.assertEqual(r.status_code, 201)
1333

    
1334
        # assert access the new object
1335
        r = self.head(url)
1336
        self.assertEqual(r.status_code, 200)
1337
        self.assertTrue('X-Object-Meta-Test' in r)
1338
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1339

    
1340
        # assert etag is the same
1341
        self.assertTrue('X-Object-Hash' in r)
1342
        self.assertEqual(r['X-Object-Hash'], self.etag)
1343

    
1344
        # assert source object does not exist
1345
        url = join_urls(self.pithos_path, self.user, self.container,
1346
                        self.object)
1347
        r = self.head(url)
1348
        self.assertEqual(r.status_code, 404)
1349

    
1350
    def test_move_to_other_container(self):
1351
        # move object to other container (not existing)
1352
        cname = get_random_name()
1353
        url = join_urls(self.pithos_path, self.user, self.container,
1354
                        self.object)
1355
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1356
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1357

    
1358
        # assert destination container does not exist
1359
        url = join_urls(self.pithos_path, self.user, cname,
1360
                        self.object)
1361
        self.assertEqual(r.status_code, 404)
1362

    
1363
        # create container
1364
        self.create_container(cname)
1365

    
1366
        # move object to other container (existing)
1367
        url = join_urls(self.pithos_path, self.user, self.container,
1368
                        self.object)
1369
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1370
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1371

    
1372
        # assert move success
1373
        url = join_urls(self.pithos_path, self.user, cname,
1374
                        self.object)
1375
        self.assertEqual(r.status_code, 201)
1376

    
1377
        # assert access the new object
1378
        r = self.head(url)
1379
        self.assertEqual(r.status_code, 200)
1380
        self.assertTrue('X-Object-Meta-Test' in r)
1381
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1382

    
1383
        # assert etag is the same
1384
        self.assertTrue('X-Object-Hash' in r)
1385
        self.assertEqual(r['X-Object-Hash'], self.etag)
1386

    
1387
        # assert source object does not exist
1388
        url = join_urls(self.pithos_path, self.user, self.container,
1389
                        self.object)
1390
        r = self.head(url)
1391
        self.assertEqual(r.status_code, 404)
1392

    
1393
    def test_move_to_other_account(self):
1394
        # create a container under alice account
1395
        cname = self.create_container(user='alice')[0]
1396

    
1397
        # create a folder under this container
1398
        folder = self.create_folder(cname, user='alice')[0]
1399

    
1400
        oname = get_random_name()
1401

    
1402
        # move object to other account container
1403
        url = join_urls(self.pithos_path, self.user, self.container,
1404
                        self.object)
1405
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1406
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1407
                      HTTP_DESTINATION_ACCOUNT='alice')
1408
        self.assertEqual(r.status_code, 403)
1409

    
1410
        # share object for read with user
1411
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1412
        r = self.post(url, user='alice', content_type='',
1413
                      HTTP_CONTENT_RANGE='bytes */*',
1414
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1415
        self.assertEqual(r.status_code, 202)
1416

    
1417
        # assert move object still is not allowed
1418
        url = join_urls(self.pithos_path, self.user, self.container,
1419
                        self.object)
1420
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1421
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1422
                      HTTP_DESTINATION_ACCOUNT='alice')
1423
        self.assertEqual(r.status_code, 403)
1424

    
1425
        # share object for write with user
1426
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1427
        r = self.post(url, user='alice',  content_type='',
1428
                      HTTP_CONTENT_RANGE='bytes */*',
1429
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1430
        self.assertEqual(r.status_code, 202)
1431

    
1432
        # assert move object now is allowed
1433
        url = join_urls(self.pithos_path, self.user, self.container,
1434
                        self.object)
1435
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1436
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1437
                      HTTP_DESTINATION_ACCOUNT='alice')
1438
        self.assertEqual(r.status_code, 201)
1439

    
1440
        # assert source object does not exist
1441
        url = join_urls(self.pithos_path, self.user, self.container,
1442
                        self.object)
1443
        r = self.head(url)
1444
        self.assertEqual(r.status_code, 404)
1445

    
1446

    
1447
class ObjectPost(PithosAPITest):
1448
    def setUp(self):
1449
        PithosAPITest.setUp(self)
1450
        self.container = 'c1'
1451
        self.create_container(self.container)
1452
        self.object, self.object_data = self.upload_object(self.container)[:2]
1453

    
1454
    def test_update_meta(self):
1455
        with AssertUUidInvariant(self.get_object_info,
1456
                                 self.container,
1457
                                 self.object):
1458
            # update metadata
1459
            d = {'a' * 114: 'b' * 256}
1460
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1461
                          k, v in d.items())
1462
            url = join_urls(self.pithos_path, self.user, self.container,
1463
                            self.object)
1464
            r = self.post(url, content_type='', **kwargs)
1465
            self.assertEqual(r.status_code, 202)
1466

    
1467
            # assert metadata have been updated
1468
            meta = self.get_object_meta(self.container, self.object)
1469

    
1470
            for k, v in d.items():
1471
                self.assertTrue(k.title() in meta)
1472
                self.assertTrue(meta[k.title()], v)
1473

    
1474
            # Header key too large
1475
            d = {'a' * 115: 'b' * 256}
1476
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1477
                          k, v in d.items())
1478
            r = self.post(url, content_type='', **kwargs)
1479
            self.assertEqual(r.status_code, 400)
1480

    
1481
            # Header value too large
1482
            d = {'a' * 114: 'b' * 257}
1483
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1484
                          k, v in d.items())
1485
            r = self.post(url, content_type='', **kwargs)
1486
            self.assertEqual(r.status_code, 400)
1487

    
1488
#            # Check utf-8 meta
1489
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
1490
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1491
#                          k, v in d.items())
1492
#            url = join_urls(self.pithos_path, self.user, self.container,
1493
#                            self.object)
1494
#            r = self.post(url, content_type='', **kwargs)
1495
#            self.assertEqual(r.status_code, 202)
1496
#
1497
#            # assert metadata have been updated
1498
#            meta = self.get_object_meta(self.container, self.object)
1499
#
1500
#            for k, v in d.items():
1501
#                key = 'X-Object-Meta-%s' % k.title()
1502
#                self.assertTrue(key in meta)
1503
#                self.assertTrue(meta[key], v)
1504
#
1505
#            # Header key too large
1506
#            d = {'α' * 114: 'β' * (256 / 2)}
1507
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1508
#                          k, v in d.items())
1509
#            r = self.post(url, content_type='', **kwargs)
1510
#            self.assertEqual(r.status_code, 400)
1511
#
1512
#            # Header value too large
1513
#            d = {'α' * 114: 'β' * 256}
1514
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1515
#                          k, v in d.items())
1516
#            r = self.udpate(url, content_type='', **kwargs)
1517
#            self.assertEqual(r.status_code, 400)
1518

    
1519
    def test_update_object(self):
1520
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1521
        oname, odata = self.upload_object(
1522
            self.container, length=random.randint(
1523
                block_size + 1, 2 * block_size))[:2]
1524

    
1525
        length = len(odata)
1526
        first_byte_pos = random.randint(1, block_size)
1527
        last_byte_pos = random.randint(block_size + 1, length - 1)
1528
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1529
        kwargs = {'content_type': 'application/octet-stream',
1530
                  'HTTP_CONTENT_RANGE': range}
1531

    
1532
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1533
        partial = last_byte_pos - first_byte_pos + 1
1534
        data = get_random_data(partial)
1535
        r = self.post(url, data=data, **kwargs)
1536

    
1537
        self.assertEqual(r.status_code, 204)
1538
        self.assertTrue('ETag' in r)
1539
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1540
                                     data)
1541
        if pithos_settings.UPDATE_MD5:
1542
            etag = md5_hash(updated_data)
1543
        else:
1544
            etag = merkle(updated_data)
1545
        #self.assertEqual(r['ETag'], etag)
1546

    
1547
        # check modified object
1548
        r = self.get(url)
1549

    
1550
        self.assertEqual(r.status_code, 200)
1551
        self.assertEqual(r.content, updated_data)
1552
        self.assertEqual(etag, r['ETag'])
1553

    
1554
    def test_update_object_divided_by_blocksize(self):
1555
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1556
        oname, odata = self.upload_object(self.container,
1557
                                          length=2 * block_size)[:2]
1558

    
1559
        length = len(odata)
1560
        first_byte_pos = block_size
1561
        last_byte_pos = 2 * block_size - 1
1562
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1563
        kwargs = {'content_type': 'application/octet-stream',
1564
                  'HTTP_CONTENT_RANGE': range}
1565

    
1566
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1567
        partial = last_byte_pos - first_byte_pos + 1
1568
        data = get_random_data(partial)
1569
        r = self.post(url, data=data, **kwargs)
1570

    
1571
        self.assertEqual(r.status_code, 204)
1572
        self.assertTrue('ETag' in r)
1573
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1574
                                     data)
1575
        if pithos_settings.UPDATE_MD5:
1576
            etag = md5_hash(updated_data)
1577
        else:
1578
            etag = merkle(updated_data)
1579
        #self.assertEqual(r['ETag'], etag)
1580

    
1581
        # check modified object
1582
        r = self.get(url)
1583

    
1584
        self.assertEqual(r.status_code, 200)
1585
        self.assertEqual(r.content, updated_data)
1586
        self.assertEqual(etag, r['ETag'])
1587

    
1588
    def test_update_object_invalid_content_length(self):
1589
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1590
        oname, odata = self.upload_object(
1591
            self.container, length=random.randint(
1592
                block_size + 1, 2 * block_size))[:2]
1593

    
1594
        length = len(odata)
1595
        first_byte_pos = random.randint(1, block_size)
1596
        last_byte_pos = random.randint(block_size + 1, length - 1)
1597
        partial = last_byte_pos - first_byte_pos + 1
1598
        data = get_random_data(partial)
1599
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1600
        kwargs = {'content_type': 'application/octet-stream',
1601
                  'HTTP_CONTENT_RANGE': range,
1602
                  'CONTENT_LENGTH': str(partial + 1)}
1603

    
1604
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1605
        r = self.post(url, data=data, **kwargs)
1606

    
1607
        self.assertEqual(r.status_code, 400)
1608

    
1609
    def test_update_object_invalid_range(self):
1610
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1611
        oname, odata = self.upload_object(
1612
            self.container, length=random.randint(block_size + 1,
1613
                                                  2 * block_size))[:2]
1614

    
1615
        length = len(odata)
1616
        first_byte_pos = random.randint(1, block_size)
1617
        last_byte_pos = first_byte_pos - 1
1618
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1619
        kwargs = {'content_type': 'application/octet-stream',
1620
                  'HTTP_CONTENT_RANGE': range}
1621

    
1622
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1623
        r = self.post(url, data=get_random_data(), **kwargs)
1624

    
1625
        self.assertEqual(r.status_code, 416)
1626

    
1627
    def test_update_object_out_of_limits(self):
1628
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1629
        oname, odata = self.upload_object(
1630
            self.container, length=random.randint(block_size + 1,
1631
                                                  2 * block_size))[:2]
1632

    
1633
        length = len(odata)
1634
        first_byte_pos = random.randint(1, block_size)
1635
        last_byte_pos = length + 1
1636
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1637
        kwargs = {'content_type': 'application/octet-stream',
1638
                  'HTTP_CONTENT_RANGE': range}
1639

    
1640
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1641
        r = self.post(url, data=get_random_data(), **kwargs)
1642

    
1643
        self.assertEqual(r.status_code, 416)
1644

    
1645
    def test_append(self):
1646
        data = get_random_data()
1647
        length = len(data)
1648
        url = join_urls(self.pithos_path, self.user, self.container,
1649
                        self.object)
1650
        r = self.post(url, data=data, content_type='application/octet-stream',
1651
                      HTTP_CONTENT_LENGTH=str(length),
1652
                      HTTP_CONTENT_RANGE='bytes */*')
1653
        self.assertEqual(r.status_code, 204)
1654

    
1655
        r = self.get(url)
1656
        content = r.content
1657
        self.assertEqual(len(content), len(self.object_data) + length)
1658
        self.assertEqual(content, self.object_data + data)
1659

    
1660
    # TODO Fix the test
1661
    def _test_update_with_chunked_transfer(self):
1662
        data = get_random_data()
1663
        length = len(data)
1664

    
1665
        url = join_urls(self.pithos_path, self.user, self.container,
1666
                        self.object)
1667
        r = self.post(url, data=data, content_type='application/octet-stream',
1668
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1669
                      HTTP_TRANSFER_ENCODING='chunked')
1670
        self.assertEqual(r.status_code, 204)
1671

    
1672
        # check modified object
1673
        r = self.get(url)
1674
        content = r.content
1675
        self.assertEqual(content[0:length], data)
1676
        self.assertEqual(content[length:], self.object_data[length:])
1677

    
1678
    def test_update_from_other_object(self):
1679
        src = self.object
1680
        dest = get_random_name()
1681

    
1682
        url = join_urls(self.pithos_path, self.user, self.container, src)
1683
        r = self.get(url)
1684
        source_data = r.content
1685
        source_meta = self.get_object_info(self.container, src)
1686

    
1687
        # update zero length object
1688
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1689
        r = self.put(url, data='')
1690
        self.assertEqual(r.status_code, 201)
1691

    
1692
        r = self.post(url,
1693
                      HTTP_CONTENT_RANGE='bytes */*',
1694
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1695
        self.assertEqual(r.status_code, 204)
1696

    
1697
        r = self.get(url)
1698
        dest_data = r.content
1699
        dest_meta = self.get_object_info(self.container, dest)
1700

    
1701
        self.assertEqual(source_data, dest_data)
1702
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1703
        self.assertEqual(source_meta['X-Object-Hash'],
1704
                         dest_meta['X-Object-Hash'])
1705
        self.assertTrue(
1706
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1707

    
1708
    def test_update_range_from_other_object(self):
1709
        src = self.object
1710
        dest = get_random_name()
1711

    
1712
        url = join_urls(self.pithos_path, self.user, self.container, src)
1713
        r = self.get(url)
1714
        source_data = r.content
1715
        source_length = len(source_data)
1716

    
1717
        # update zero length object
1718
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1719
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1720
        initial_data = get_random_data(
1721
            length=source_length + random.randint(0, block_size))
1722

    
1723
        r = self.put(url, data=initial_data)
1724
        self.assertEqual(r.status_code, 201)
1725

    
1726
        offset = random.randint(0, source_length - 1)
1727
        upto = random.randint(offset, source_length - 1)
1728
        r = self.post(url,
1729
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1730
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1731
        self.assertEqual(r.status_code, 204)
1732

    
1733
        r = self.get(url)
1734
        content = r.content
1735
        self.assertEqual(content, (initial_data[:offset] +
1736
                                   source_data[:upto - offset + 1] +
1737
                                   initial_data[upto + 1:]))
1738

    
1739
    def test_update_range_from_invalid_other_object(self):
1740
        src = self.object
1741
        dest = get_random_name()
1742

    
1743
        url = join_urls(self.pithos_path, self.user, self.container, src)
1744
        r = self.get(url)
1745

    
1746
        # update zero length object
1747
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1748
        initial_data = get_random_data()
1749
        length = len(initial_data)
1750
        r = self.put(url, data=initial_data)
1751
        self.assertEqual(r.status_code, 201)
1752

    
1753
        offset = random.randint(1, length - 2)
1754
        upto = random.randint(offset, length - 1)
1755

    
1756
        # source object does not start with /
1757
        r = self.post(url,
1758
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1759
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1760
        self.assertEqual(r.status_code, 400)
1761

    
1762
        # source object does not exist
1763
        r = self.post(url,
1764
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1765
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1766
        self.assertEqual(r.status_code, 404)
1767

    
1768
    def test_restore_version(self):
1769
        info = self.get_object_info(self.container, self.object)
1770
        v = []
1771
        append = v.append
1772
        append((info['X-Object-Version'],
1773
                int(info['Content-Length']),
1774
                self.object_data))
1775

    
1776
        # update object
1777
        data, r = self.upload_object(self.container, self.object,
1778
                                     length=v[0][1] - 1)[1:]
1779
        self.assertTrue('X-Object-Version' in r)
1780
        append((r['X-Object-Version'], len(data), data))
1781
        # v[0][1] > v[1][1]
1782

    
1783
        # update with the previous version
1784
        url = join_urls(self.pithos_path, self.user, self.container,
1785
                        self.object)
1786
        r = self.post(url,
1787
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1788
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1789
                                                       self.object),
1790
                      HTTP_X_SOURCE_VERSION=v[0][0],
1791
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1792
        self.assertEqual(r.status_code, 204)
1793
        # v[2][1] = v[0][1] > v[1][1]
1794

    
1795
        # check content
1796
        r = self.get(url)
1797
        content = r.content
1798
        self.assertEqual(len(content), v[0][1])
1799
        self.assertEqual(content, self.object_data)
1800
        append((r['X-Object-Version'], len(content), content))
1801

    
1802
        # update object content(v4) > content(v2)
1803
        data, r = self.upload_object(self.container, self.object,
1804
                                     length=v[2][1] + 1)[1:]
1805
        self.assertTrue('X-Object-Version' in r)
1806
        append((r['X-Object-Version'], len(data), data))
1807
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1808

    
1809
        # update with the previous version
1810
        r = self.post(url,
1811
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1812
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1813
                                                       self.object),
1814
                      HTTP_X_SOURCE_VERSION=v[2][0],
1815
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1816
        self.assertEqual(r.status_code, 204)
1817
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1818

    
1819
        # check content
1820
        r = self.get(url)
1821
        data = r.content
1822
        self.assertEqual(data, v[2][2])
1823
        append((r['X-Object-Version'], len(data), data))
1824

    
1825
    def test_update_from_other_version(self):
1826
        versions = []
1827
        info = self.get_object_info(self.container, self.object)
1828
        versions.append(info['X-Object-Version'])
1829
        pre_length = int(info['Content-Length'])
1830

    
1831
        # update object
1832
        d1, r = self.upload_object(self.container, self.object,
1833
                                   length=pre_length - 1)[1:]
1834
        self.assertTrue('X-Object-Version' in r)
1835
        versions.append(r['X-Object-Version'])
1836

    
1837
        # update object
1838
        d2, r = self.upload_object(self.container, self.object,
1839
                                   length=pre_length - 2)[1:]
1840
        self.assertTrue('X-Object-Version' in r)
1841
        versions.append(r['X-Object-Version'])
1842

    
1843
        # get previous version
1844
        url = join_urls(self.pithos_path, self.user, self.container,
1845
                        self.object)
1846
        r = self.get('%s?version=list&format=json' % url)
1847
        self.assertEqual(r.status_code, 200)
1848
        l = json.loads(r.content)['versions']
1849
        self.assertEqual(len(l), 3)
1850
        self.assertEqual([str(v[0]) for v in l], versions)
1851

    
1852
        # update with the previous version
1853
        r = self.post(url,
1854
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1855
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1856
                                                       self.object),
1857
                      HTTP_X_SOURCE_VERSION=versions[0])
1858
        self.assertEqual(r.status_code, 204)
1859

    
1860
        # check content
1861
        r = self.get(url)
1862
        content = r.content
1863
        self.assertEqual(len(content), pre_length)
1864
        self.assertEqual(content, self.object_data)
1865

    
1866
        # update object
1867
        d3, r = self.upload_object(self.container, self.object,
1868
                                   length=len(d2) + 1)[1:]
1869
        self.assertTrue('X-Object-Version' in r)
1870
        versions.append(r['X-Object-Version'])
1871

    
1872
        # update with the previous version
1873
        r = self.post(url,
1874
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1875
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1876
                                                       self.object),
1877
                      HTTP_X_SOURCE_VERSION=versions[-2])
1878
        self.assertEqual(r.status_code, 204)
1879

    
1880
        # check content
1881
        r = self.get(url)
1882
        content = r.content
1883
        self.assertEqual(content, d2 + d3[-1])
1884

    
1885
    def test_update_invalid_permissions(self):
1886
        url = join_urls(self.pithos_path, self.user, self.container,
1887
                        self.object)
1888
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1889
                      HTTP_X_OBJECT_SHARING='%s' % (257*'a'))
1890
        self.assertEqual(r.status_code, 400)
1891

    
1892
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1893
                      HTTP_X_OBJECT_SHARING='read=%s' % (257*'a'))
1894
        self.assertEqual(r.status_code, 400)
1895

    
1896
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1897
                      HTTP_X_OBJECT_SHARING='write=%s' % (257*'a'))
1898
        self.assertEqual(r.status_code, 400)
1899

    
1900
class ObjectDelete(PithosAPITest):
1901
    def setUp(self):
1902
        PithosAPITest.setUp(self)
1903
        self.container = 'c1'
1904
        self.create_container(self.container)
1905
        self.object, self.object_data = self.upload_object(self.container)[:2]
1906

    
1907
    def test_delete(self):
1908
        url = join_urls(self.pithos_path, self.user, self.container,
1909
                        self.object)
1910
        r = self.delete(url)
1911
        self.assertEqual(r.status_code, 204)
1912

    
1913
        r = self.head(url)
1914
        self.assertEqual(r.status_code, 404)
1915

    
1916
    def test_delete_non_existent(self):
1917
        url = join_urls(self.pithos_path, self.user, self.container,
1918
                        get_random_name())
1919
        r = self.delete(url)
1920
        self.assertEqual(r.status_code, 404)
1921

    
1922
    def test_delete_dir(self):
1923
        folder = self.create_folder(self.container)[0]
1924
        subfolder = self.create_folder(
1925
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1926
        objects = [subfolder]
1927
        append = objects.append
1928
        append(self.upload_object(self.container,
1929
                                  '%s/%s' % (folder, get_random_name()),
1930
                                  depth='1')[0])
1931
        append(self.upload_object(self.container,
1932
                                  '%s/%s' % (subfolder, get_random_name()),
1933
                                  depth='2')[0])
1934
        other = self.upload_object(self.container, strnextling(folder))[0]
1935

    
1936
        # move dir
1937
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1938
        r = self.delete('%s?delimiter=/' % url)
1939
        self.assertEqual(r.status_code, 204)
1940

    
1941
        for obj in objects:
1942
            # assert object does not exist
1943
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1944
            r = self.head(url)
1945
            self.assertEqual(r.status_code, 404)
1946

    
1947
        # assert other has not been deleted
1948
        url = join_urls(self.pithos_path, self.user, self.container, other)
1949
        r = self.head(url)
1950
        self.assertEqual(r.status_code, 200)