Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 7e402b46

History | View | Annotate | Download (78.1 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from collections import defaultdict
38
from urllib import quote, unquote
39
from functools import partial
40

    
41
from pithos.api.test import (PithosAPITest, pithos_settings,
42
                             AssertMappingInvariant, AssertUUidInvariant,
43
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44
                             DATE_FORMATS, pithos_test_settings)
45
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46
                                  get_random_data, get_random_name)
47

    
48
from synnefo.lib import join_urls
49

    
50
import django.utils.simplejson as json
51

    
52
import random
53
import re
54
import datetime
55
import time as _time
56

    
57
merkle = partial(merkle,
58
                 blocksize=TEST_BLOCK_SIZE,
59
                 blockhash=TEST_HASH_ALGORITHM)
60

    
61

    
62
class ObjectHead(PithosAPITest):
63
    def test_get_object_meta(self):
64
        cname = self.create_container()[0]
65
        oname, odata = self.upload_object(cname)[:-1]
66

    
67
        url = join_urls(self.pithos_path, self.user, cname, oname)
68
        r = self.head(url)
69

    
70
        mandatory = ['Etag',
71
                     'Content-Length',
72
                     'Content-Type',
73
                     'Last-Modified',
74
                     'X-Object-Hash',
75
                     'X-Object-UUID',
76
                     'X-Object-Version',
77
                     'X-Object-Version-Timestamp',
78
                     'X-Object-Modified-By']
79
        for i in mandatory:
80
            self.assertTrue(i in r)
81

    
82
        r = self.post(url, content_type='',
83
                      HTTP_CONTENT_ENCODING='gzip',
84
                      HTTP_CONTENT_DISPOSITION=(
85
                          'attachment; filename="%s"' % oname))
86
        self.assertEqual(r.status_code, 202)
87

    
88
        r = self.head(url)
89
        for i in mandatory:
90
            self.assertTrue(i in r)
91
        self.assertTrue('Content-Encoding' in r)
92
        self.assertEqual(r['Content-Encoding'], 'gzip')
93
        self.assertTrue('Content-Disposition' in r)
94
        self.assertEqual(unquote(r['Content-Disposition']),
95
                         'attachment; filename="%s"' % oname)
96

    
97
        prefix = 'myobject/'
98
        data = ''
99
        for i in range(random.randint(2, 10)):
100
            part = '%s%d' % (prefix, i)
101
            data += self.upload_object(cname, oname=part)[1]
102

    
103
        manifest = '%s/%s' % (cname, prefix)
104
        oname = get_random_name()
105
        url = join_urls(self.pithos_path, self.user, cname, oname)
106
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
107
        self.assertEqual(r.status_code, 201)
108

    
109
        r = self.head(url)
110
        for i in mandatory:
111
            self.assertTrue(i in r)
112
        self.assertTrue('X-Object-Manifest' in r)
113
        self.assertEqual(r['X-Object-Manifest'], manifest)
114

    
115

    
116
class ObjectGet(PithosAPITest):
117
    def setUp(self):
118
        PithosAPITest.setUp(self)
119
        self.containers = ['c1', 'c2']
120

    
121
        # create some containers
122
        for c in self.containers:
123
            self.create_container(c)
124

    
125
        # upload files
126
        self.objects = defaultdict(list)
127
        self.objects['c1'].append(self.upload_object('c1')[0])
128

    
129
    def test_versions(self):
130
        c = 'c1'
131
        o = self.objects[c][0]
132
        url = join_urls(self.pithos_path, self.user, c, o)
133

    
134
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
135
        r = self.post(url, content_type='', **meta)
136
        self.assertEqual(r.status_code, 202)
137

    
138
        url = join_urls(self.pithos_path, self.user, c, o)
139
        r = self.get('%s?version=list&format=json' % url)
140
        self.assertEqual(r.status_code, 200)
141
        l1 = json.loads(r.content)['versions']
142
        self.assertEqual(len(l1), 2)
143

    
144
        # update meta
145
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
146
                'HTTP_X_OBJECT_META_STOCK': 'True'}
147
        r = self.post(url, content_type='', **meta)
148
        self.assertEqual(r.status_code, 202)
149

    
150
        # assert a newly created version has been created
151
        r = self.get('%s?version=list&format=json' % url)
152
        self.assertEqual(r.status_code, 200)
153
        l2 = json.loads(r.content)['versions']
154
        self.assertEqual(len(l2), len(l1) + 1)
155
        self.assertEqual(l2[:-1], l1)
156

    
157
        vserial, _ = l2[-2]
158
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
159
                         {'Quality': 'AAA'})
160

    
161
        # update data
162
        self.append_object_data(c, o)
163

    
164
        # assert a newly created version has been created
165
        r = self.get('%s?version=list&format=json' % url)
166
        self.assertEqual(r.status_code, 200)
167
        l3 = json.loads(r.content)['versions']
168
        self.assertEqual(len(l3), len(l2) + 1)
169
        self.assertEqual(l3[:-1], l2)
170

    
171
    def test_get_version(self):
172
        c = 'c1'
173
        o = self.objects[c][0]
174
        url = join_urls(self.pithos_path, self.user, c, o)
175

    
176
        # Update metadata
177
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
178
        r = self.post(url, content_type='', **meta)
179
        self.assertEqual(r.status_code, 202)
180

    
181
        url = join_urls(self.pithos_path, self.user, c, o)
182
        r = self.get('%s?version=list&format=json' % url)
183
        self.assertEqual(r.status_code, 200)
184
        l = json.loads(r.content)['versions']
185
        self.assertEqual(len(l), 2)
186

    
187
        r = self.head('%s?version=%s' % (url, l[0][0]))
188
        self.assertEqual(r.status_code, 200)
189
        self.assertTrue('X-Object-Meta-Quality' not in r)
190

    
191
        r = self.head('%s?version=%s' % (url, l[1][0]))
192
        self.assertEqual(r.status_code, 200)
193
        self.assertTrue('X-Object-Meta-Quality' in r)
194

    
195
        # test invalid version
196
        r = self.head('%s?version=-1' % url)
197
        self.assertEqual(r.status_code, 404)
198

    
199
        other_name, other_data, r = self.upload_object(c)
200
        self.assertTrue('X-Object-Version' in r)
201
        other_version = r['X-Object-Version']
202

    
203
        self.assertTrue(o != other_name)
204

    
205
        r = self.get('%s?version=%s' % (url, other_version))
206
        self.assertEqual(r.status_code, 404)
207

    
208
        r = self.head('%s?version=%s' % (url, other_version))
209
        self.assertEqual(r.status_code, 404)
210

    
211
    def test_objects_with_trailing_spaces(self):
212
        # create object
213
        oname = self.upload_object('c1')[0]
214
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
215

    
216
        r = self.get(quote('%s ' % url))
217
        self.assertEqual(r.status_code, 404)
218

    
219
        # delete object
220
        self.delete(url)
221

    
222
        r = self.get(url)
223
        self.assertEqual(r.status_code, 404)
224

    
225
        # upload object with trailing space
226
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
227

    
228
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
229
        r = self.get(url)
230
        self.assertEqual(r.status_code, 200)
231

    
232
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
233
        r = self.get(url)
234
        self.assertEqual(r.status_code, 404)
235

    
236
    def test_get_partial(self):
237
        cname = self.containers[0]
238
        oname, odata = self.upload_object(cname, length=512)[:-1]
239
        url = join_urls(self.pithos_path, self.user, cname, oname)
240
        r = self.get(url, HTTP_RANGE='bytes=0-499')
241
        self.assertEqual(r.status_code, 206)
242
        data = r.content
243
        self.assertEqual(data, odata[:500])
244
        self.assertTrue('Content-Range' in r)
245
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
246
        self.assertTrue('Content-Type' in r)
247
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
248

    
249
    def test_get_final_500(self):
250
        cname = self.containers[0]
251
        oname, odata = self.upload_object(cname, length=512)[:-1]
252
        size = len(odata)
253
        url = join_urls(self.pithos_path, self.user, cname, oname)
254
        r = self.get(url, HTTP_RANGE='bytes=-500')
255
        self.assertEqual(r.status_code, 206)
256
        self.assertEqual(r.content, odata[-500:])
257
        self.assertTrue('Content-Range' in r)
258
        self.assertEqual(r['Content-Range'],
259
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
260
        self.assertTrue('Content-Type' in r)
261
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
262

    
263
    def test_get_rest(self):
264
        cname = self.containers[0]
265
        oname, odata = self.upload_object(cname, length=512)[:-1]
266
        size = len(odata)
267
        url = join_urls(self.pithos_path, self.user, cname, oname)
268
        offset = len(odata) - random.randint(1, 512)
269
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
270
        self.assertEqual(r.status_code, 206)
271
        self.assertEqual(r.content, odata[offset:])
272
        self.assertTrue('Content-Range' in r)
273
        self.assertEqual(r['Content-Range'],
274
                         'bytes %s-%s/%s' % (offset, size - 1, size))
275
        self.assertTrue('Content-Type' in r)
276
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
277

    
278
    def test_get_range_not_satisfiable(self):
279
        cname = self.containers[0]
280
        oname, odata = self.upload_object(cname, length=512)[:-1]
281
        url = join_urls(self.pithos_path, self.user, cname, oname)
282

    
283
        # TODO
284
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
285
        #self.assertEqual(r.status_code, 416)
286

    
287
        offset = len(odata) + 1
288
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
289
        self.assertEqual(r.status_code, 416)
290

    
291
    def test_multiple_range(self):
292
        cname = self.containers[0]
293
        oname, odata = self.upload_object(cname)[:-1]
294
        url = join_urls(self.pithos_path, self.user, cname, oname)
295

    
296
        l = ['0-499', '-500', '1000-']
297
        ranges = 'bytes=%s' % ','.join(l)
298
        r = self.get(url, HTTP_RANGE=ranges)
299
        self.assertEqual(r.status_code, 206)
300
        self.assertTrue('content-type' in r)
301
        p = re.compile(
302
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
303
            re.I)
304
        m = p.match(r['content-type'])
305
        if m is None:
306
            self.fail('Invalid multiple range content type')
307
        boundary = m.groupdict()['boundary']
308
        cparts = r.content.split('--%s' % boundary)[1:-1]
309

    
310
        # assert content parts length
311
        self.assertEqual(len(cparts), len(l))
312

    
313
        # for each content part assert headers
314
        i = 0
315
        for cpart in cparts:
316
            content = cpart.split('\r\n')
317
            headers = content[1:3]
318
            content_range = headers[0].split(': ')
319
            self.assertEqual(content_range[0], 'Content-Range')
320

    
321
            r = l[i].split('-')
322
            if not r[0] and not r[1]:
323
                pass
324
            elif not r[0]:
325
                start = len(odata) - int(r[1])
326
                end = len(odata)
327
            elif not r[1]:
328
                start = int(r[0])
329
                end = len(odata)
330
            else:
331
                start = int(r[0])
332
                end = int(r[1]) + 1
333
            fdata = odata[start:end]
334
            sdata = '\r\n'.join(content[4:-1])
335
            self.assertEqual(len(fdata), len(sdata))
336
            self.assertEquals(fdata, sdata)
337
            i += 1
338

    
339
    def test_multiple_range_not_satisfiable(self):
340
        # perform get with multiple range
341
        cname = self.containers[0]
342
        oname, odata = self.upload_object(cname)[:-1]
343
        out_of_range = len(odata) + 1
344
        l = ['0-499', '-500', '%d-' % out_of_range]
345
        ranges = 'bytes=%s' % ','.join(l)
346
        url = join_urls(self.pithos_path, self.user, cname, oname)
347
        r = self.get(url, HTTP_RANGE=ranges)
348
        self.assertEqual(r.status_code, 416)
349

    
350
    def test_get_if_match(self):
351
        cname = self.containers[0]
352
        oname, odata = self.upload_object(cname)[:-1]
353

    
354
        # perform get with If-Match
355
        url = join_urls(self.pithos_path, self.user, cname, oname)
356

    
357
        if pithos_settings.UPDATE_MD5:
358
            etag = md5_hash(odata)
359
        else:
360
            etag = merkle(odata)
361

    
362
        r = self.get(url, HTTP_IF_MATCH=etag)
363

    
364
        # assert get success
365
        self.assertEqual(r.status_code, 200)
366

    
367
        # assert response content
368
        self.assertEqual(r.content, odata)
369

    
370
    def test_get_if_match_star(self):
371
        cname = self.containers[0]
372
        oname, odata = self.upload_object(cname)[:-1]
373

    
374
        # perform get with If-Match *
375
        url = join_urls(self.pithos_path, self.user, cname, oname)
376
        r = self.get(url, HTTP_IF_MATCH='*')
377

    
378
        # assert get success
379
        self.assertEqual(r.status_code, 200)
380

    
381
        # assert response content
382
        self.assertEqual(r.content, odata)
383

    
384
    def test_get_multiple_if_match(self):
385
        cname = self.containers[0]
386
        oname, odata = self.upload_object(cname)[:-1]
387

    
388
        # perform get with If-Match
389
        url = join_urls(self.pithos_path, self.user, cname, oname)
390

    
391
        if pithos_settings.UPDATE_MD5:
392
            etag = md5_hash(odata)
393
        else:
394
            etag = merkle(odata)
395

    
396
        quoted = lambda s: '"%s"' % s
397
        r = self.get(url, HTTP_IF_MATCH=','.join(
398
            [quoted(etag), quoted(get_random_data(64))]))
399

    
400
        # assert get success
401
        self.assertEqual(r.status_code, 200)
402

    
403
        # assert response content
404
        self.assertEqual(r.content, odata)
405

    
406
    def test_if_match_precondition_failed(self):
407
        cname = self.containers[0]
408
        oname, odata = self.upload_object(cname)[:-1]
409

    
410
        # perform get with If-Match
411
        url = join_urls(self.pithos_path, self.user, cname, oname)
412
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
413
        self.assertEqual(r.status_code, 412)
414

    
415
    def test_if_none_match(self):
416
        # upload object
417
        cname = self.containers[0]
418
        oname, odata = self.upload_object(cname)[:-1]
419

    
420
        if pithos_settings.UPDATE_MD5:
421
            etag = md5_hash(odata)
422
        else:
423
            etag = merkle(odata)
424

    
425
        # perform get with If-None-Match
426
        url = join_urls(self.pithos_path, self.user, cname, oname)
427
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
428

    
429
        # assert precondition_failed
430
        self.assertEqual(r.status_code, 304)
431

    
432
        # update object data
433
        r = self.append_object_data(cname, oname)[-1]
434
        self.assertTrue(etag != r['ETag'])
435

    
436
        # perform get with If-None-Match
437
        url = join_urls(self.pithos_path, self.user, cname, oname)
438
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
439

    
440
        # assert get success
441
        self.assertEqual(r.status_code, 200)
442

    
443
    def test_if_none_match_star(self):
444
        # upload object
445
        cname = self.containers[0]
446
        oname, odata = self.upload_object(cname)[:-1]
447

    
448
        # perform get with If-None-Match with star
449
        url = join_urls(self.pithos_path, self.user, cname, oname)
450
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
451

    
452
        self.assertEqual(r.status_code, 304)
453

    
454
    def test_if_modified_since(self):
455
        # upload object
456
        cname = self.containers[0]
457
        oname, odata = self.upload_object(cname)[:-1]
458
        object_info = self.get_object_info(cname, oname)
459
        last_modified = object_info['Last-Modified']
460
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
461
        t1_formats = map(t1.strftime, DATE_FORMATS)
462

    
463
        # Check not modified since
464
        url = join_urls(self.pithos_path, self.user, cname, oname)
465
        for t in t1_formats:
466
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
467
            self.assertEqual(r.status_code, 304)
468

    
469
        _time.sleep(1)
470

    
471
        # update object data
472
        appended_data = self.append_object_data(cname, oname)[1]
473

    
474
        # Check modified since
475
        url = join_urls(self.pithos_path, self.user, cname, oname)
476
        for t in t1_formats:
477
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
478
            self.assertEqual(r.status_code, 200)
479
            self.assertEqual(r.content, odata + appended_data)
480

    
481
    def test_if_modified_since_invalid_date(self):
482
        cname = self.containers[0]
483
        oname, odata = self.upload_object(cname)[:-1]
484
        url = join_urls(self.pithos_path, self.user, cname, oname)
485
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
486
        self.assertEqual(r.status_code, 200)
487
        self.assertEqual(r.content, odata)
488

    
489
    def test_if_not_modified_since(self):
490
        cname = self.containers[0]
491
        oname, odata = self.upload_object(cname)[:-1]
492
        url = join_urls(self.pithos_path, self.user, cname, oname)
493
        object_info = self.get_object_info(cname, oname)
494
        last_modified = object_info['Last-Modified']
495
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
496

    
497
        # Check unmodified
498
        t1 = t + datetime.timedelta(seconds=1)
499
        t1_formats = map(t1.strftime, DATE_FORMATS)
500
        for t in t1_formats:
501
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
502
            self.assertEqual(r.status_code, 200)
503
            self.assertEqual(r.content, odata)
504

    
505
        # modify object
506
        _time.sleep(2)
507
        self.append_object_data(cname, oname)
508

    
509
        object_info = self.get_object_info(cname, oname)
510
        last_modified = object_info['Last-Modified']
511
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
512
        t2 = t - datetime.timedelta(seconds=1)
513
        t2_formats = map(t2.strftime, DATE_FORMATS)
514

    
515
        # check modified
516
        for t in t2_formats:
517
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
518
            self.assertEqual(r.status_code, 412)
519

    
520
        # modify account: update object meta
521
        _time.sleep(1)
522
        self.update_object_meta(cname, oname, {'foo': 'bar'})
523

    
524
        object_info = self.get_object_info(cname, oname)
525
        last_modified = object_info['Last-Modified']
526
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
527
        t3 = t - datetime.timedelta(seconds=1)
528
        t3_formats = map(t3.strftime, DATE_FORMATS)
529

    
530
        # check modified
531
        for t in t3_formats:
532
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
533
            self.assertEqual(r.status_code, 412)
534

    
535
    def test_if_unmodified_since(self):
536
        cname = self.containers[0]
537
        oname, odata = self.upload_object(cname)[:-1]
538
        url = join_urls(self.pithos_path, self.user, cname, oname)
539
        object_info = self.get_object_info(cname, oname)
540
        last_modified = object_info['Last-Modified']
541
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
542
        t = t + datetime.timedelta(seconds=1)
543
        t_formats = map(t.strftime, DATE_FORMATS)
544

    
545
        for tf in t_formats:
546
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
547
            self.assertEqual(r.status_code, 200)
548
            self.assertEqual(r.content, odata)
549

    
550
    def test_if_unmodified_since_precondition_failed(self):
551
        cname = self.containers[0]
552
        oname, odata = self.upload_object(cname)[:-1]
553
        url = join_urls(self.pithos_path, self.user, cname, oname)
554
        object_info = self.get_object_info(cname, oname)
555
        last_modified = object_info['Last-Modified']
556
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
557
        t = t - datetime.timedelta(seconds=1)
558
        t_formats = map(t.strftime, DATE_FORMATS)
559

    
560
        for tf in t_formats:
561
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
562
            self.assertEqual(r.status_code, 412)
563

    
564
    def test_hashes(self):
565
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
566
        cname = self.containers[0]
567
        oname, odata = self.upload_object(cname, length=l)[:-1]
568
        size = len(odata)
569

    
570
        url = join_urls(self.pithos_path, self.user, cname, oname)
571
        r = self.get('%s?format=json&hashmap' % url)
572
        self.assertEqual(r.status_code, 200)
573
        body = json.loads(r.content)
574

    
575
        hashes = body['hashes']
576
        block_size = body['block_size']
577
        block_num = size / block_size if size / block_size == 0 else\
578
            size / block_size + 1
579
        self.assertTrue(len(hashes), block_num)
580
        i = 0
581
        for h in hashes:
582
            start = i * block_size
583
            end = (i + 1) * block_size
584
            hash = merkle(odata[start:end])
585
            self.assertEqual(h, hash)
586
            i += 1
587

    
588

    
589
class ObjectPut(PithosAPITest):
590
    def setUp(self):
591
        PithosAPITest.setUp(self)
592
        self.container = get_random_name()
593
        self.create_container(self.container)
594

    
595
    def test_upload(self):
596
        cname = self.container
597
        oname = get_random_name()
598
        data = get_random_data()
599
        meta = {'test': 'test1'}
600
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
601
                       for k, v in meta.iteritems())
602
        headers['HTTP_CONTENT_DISPOSITION'] = 'attachment; filename="%f2"'
603
        url = join_urls(self.pithos_path, self.user, cname, oname)
604
        r = self.put(url, data=data, content_type='application/pdf',
605
                     quote_extra=False, **headers)
606
        self.assertEqual(r.status_code, 400)
607

    
608
        headers['HTTP_CONTENT_DISPOSITION'] = ('attachment; filename="%s"' %
609
                                               oname)
610
        url = join_urls(self.pithos_path, self.user, cname, oname)
611
        r = self.put(url, data=data, content_type='application/pdf', **headers)
612
        self.assertEqual(r.status_code, 201)
613
        self.assertTrue('ETag' in r)
614
        self.assertTrue('X-Object-Version' in r)
615

    
616
        info = self.get_object_info(cname, oname)
617

    
618
        # assert object meta
619
        self.assertTrue('X-Object-Meta-Test' in info)
620
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
621

    
622
        # assert content-type
623
        self.assertEqual(info['content-type'], 'application/pdf')
624

    
625
        # assert uploaded content
626
        r = self.get(url)
627
        self.assertEqual(r.status_code, 200)
628
        self.assertEqual(r.content, data)
629

    
630
    def test_maximum_upload_size_exceeds(self):
631
        cname = self.container
632
        oname = get_random_name()
633

    
634
        # set container quota to 100
635
        url = join_urls(self.pithos_path, self.user, cname)
636
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
637
        self.assertEqual(r.status_code, 202)
638

    
639
        info = self.get_container_info(cname)
640
        length = int(info['X-Container-Policy-Quota']) + 1
641

    
642
        data = get_random_data(length)
643
        url = join_urls(self.pithos_path, self.user, cname, oname)
644
        r = self.put(url, data=data)
645
        self.assertEqual(r.status_code, 413)
646

    
647
    def test_upload_with_name_containing_slash(self):
648
        cname = self.container
649
        oname = '/%s' % get_random_name()
650
        data = get_random_data()
651
        url = join_urls(self.pithos_path, self.user, cname, oname)
652
        r = self.put(url, data=data)
653
        self.assertEqual(r.status_code, 201)
654
        self.assertTrue('ETag' in r)
655
        self.assertTrue('X-Object-Version' in r)
656

    
657
        r = self.get(url)
658
        self.assertEqual(r.status_code, 200)
659
        self.assertEqual(r.content, data)
660

    
661
    def test_upload_unprocessable_entity(self):
662
        cname = self.container
663
        oname = get_random_name()
664
        data = get_random_data()
665
        url = join_urls(self.pithos_path, self.user, cname, oname)
666
        r = self.put(url, data=data, HTTP_ETAG='123')
667
        self.assertEqual(r.status_code, 422)
668

    
669
#    def test_chunked_transfer(self):
670
#        cname = self.container
671
#        oname = '/%s' % get_random_name()
672
#        data = get_random_data()
673
#        url = join_urls(self.pithos_path, self.user, cname, oname)
674
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
675
#        self.assertEqual(r.status_code, 201)
676
#        self.assertTrue('ETag' in r)
677
#        self.assertTrue('X-Object-Version' in r)
678

    
679
    def test_manifestation(self):
680
        cname = self.container
681
        prefix = 'myobject/'
682
        data = ''
683
        for i in range(random.randint(2, 10)):
684
            part = '%s%d' % (prefix, i)
685
            data += self.upload_object(cname, oname=part)[1]
686

    
687
        manifest = '%s/%s' % (cname, prefix)
688
        oname = get_random_name()
689
        url = join_urls(self.pithos_path, self.user, cname, oname)
690
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
691
        self.assertEqual(r.status_code, 201)
692

    
693
        # assert object exists
694
        r = self.get(url)
695
        self.assertEqual(r.status_code, 200)
696

    
697
        # assert its content
698
        self.assertEqual(r.content, data)
699

    
700
        # invalid manifestation
701
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
702
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
703
        r = self.get(url)
704
        self.assertEqual(r.content, '')
705

    
706
    def test_create_zero_length_object(self):
707
        cname = self.container
708
        oname = get_random_name()
709
        url = join_urls(self.pithos_path, self.user, cname, oname)
710
        r = self.put(url, data='')
711
        self.assertEqual(r.status_code, 201)
712

    
713
        r = self.get(url)
714
        self.assertEqual(r.status_code, 200)
715
        self.assertEqual(int(r['Content-Length']), 0)
716
        self.assertEqual(r.content, '')
717

    
718
        r = self.get('%s?hashmap=&format=json' % url)
719
        self.assertEqual(r.status_code, 200)
720
        body = json.loads(r.content)
721
        hashes = body['hashes']
722
        hash = merkle('')
723
        self.assertEqual(hashes, [hash])
724

    
725
    def test_create_object_by_hashmap(self):
726
        cname = self.container
727
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
728

    
729
        # upload an object
730
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
731
        # get it hashmap
732
        url = join_urls(self.pithos_path, self.user, cname, oname)
733
        r = self.get('%s?hashmap=&format=json' % url)
734

    
735
        oname = get_random_name()
736
        url = join_urls(self.pithos_path, self.user, cname, oname)
737
        r = self.put('%s?hashmap=' % url, data=r.content)
738
        self.assertEqual(r.status_code, 201)
739

    
740
        r = self.get(url)
741
        self.assertEqual(r.status_code, 200)
742
        self.assertEqual(r.content, data)
743

    
744
    def test_create_object_by_invalid_hashmap(self):
745
        cname = self.container
746
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
747

    
748
        # upload an object
749
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
750
        # get it hashmap
751
        url = join_urls(self.pithos_path, self.user, cname, oname)
752
        r = self.get('%s?hashmap=&format=json' % url)
753
        data = r.content
754
        try:
755
            hashmap = json.loads(data)
756
        except:
757
            self.fail('JSON format expected')
758

    
759
        oname = get_random_name()
760
        url = join_urls(self.pithos_path, self.user, cname, oname)
761
        hashmap['hashes'] = [get_random_name()]
762
        r = self.put('%s?hashmap=' % url, data=json.dumps(hashmap))
763
        self.assertEqual(r.status_code, 400)
764

    
765

    
766
class ObjectPutCopy(PithosAPITest):
767
    def setUp(self):
768
        PithosAPITest.setUp(self)
769
        self.container = 'c1'
770
        self.create_container(self.container)
771
        self.object, self.data = self.upload_object(self.container)[:-1]
772

    
773
        url = join_urls(
774
            self.pithos_path, self.user, self.container, self.object)
775
        r = self.head(url)
776
        self.etag = r['X-Object-Hash']
777

    
778
    def test_copy(self):
779
        with AssertMappingInvariant(self.get_object_info, self.container,
780
                                    self.object):
781
            # copy object
782
            oname = get_random_name()
783
            url = join_urls(self.pithos_path, self.user, self.container, oname)
784
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
785
                         HTTP_X_COPY_FROM='/%s/%s' % (
786
                             self.container, self.object))
787

    
788
            # assert copy success
789
            self.assertEqual(r.status_code, 201)
790

    
791
            # assert access the new object
792
            r = self.head(url)
793
            self.assertEqual(r.status_code, 200)
794
            self.assertTrue('X-Object-Meta-Test' in r)
795
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
796

    
797
            # assert etag is the same
798
            self.assertTrue('X-Object-Hash' in r)
799
            self.assertEqual(r['X-Object-Hash'], self.etag)
800

    
801
    def test_copy_from_different_container(self):
802
        cname = 'c2'
803
        self.create_container(cname)
804
        with AssertMappingInvariant(self.get_object_info, self.container,
805
                                    self.object):
806
            oname = get_random_name()
807
            url = join_urls(self.pithos_path, self.user, cname, oname)
808
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
809
                         HTTP_X_COPY_FROM='/%s/%s' % (
810
                             self.container, self.object))
811

    
812
            # assert copy success
813
            self.assertEqual(r.status_code, 201)
814

    
815
            # assert access the new object
816
            r = self.head(url)
817
            self.assertEqual(r.status_code, 200)
818
            self.assertTrue('X-Object-Meta-Test' in r)
819
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
820

    
821
            # assert etag is the same
822
            self.assertTrue('X-Object-Hash' in r)
823
            self.assertEqual(r['X-Object-Hash'], self.etag)
824

    
825
    def test_copy_from_other_account(self):
826
        cname = 'c2'
827
        self.create_container(cname, user='chuck')
828
        self.create_container(cname, user='alice')
829

    
830
        # share object for read with alice
831
        url = join_urls(self.pithos_path, self.user, self.container,
832
                        self.object)
833
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
834
                      HTTP_X_OBJECT_SHARING='read=alice')
835
        self.assertEqual(r.status_code, 202)
836

    
837
        # assert not allowed for chuck
838
        oname = get_random_name()
839
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
840
        r = self.put(url, data='', user='chuck',
841
                     HTTP_X_OBJECT_META_TEST='testcopy',
842
                     HTTP_X_COPY_FROM='/%s/%s' % (
843
                         self.container, self.object),
844
                     HTTP_X_SOURCE_ACCOUNT='user')
845

    
846
        self.assertEqual(r.status_code, 403)
847

    
848
        # assert copy success for alice
849
        url = join_urls(self.pithos_path, 'alice', cname, oname)
850
        r = self.put(url, data='', user='alice',
851
                     HTTP_X_OBJECT_META_TEST='testcopy',
852
                     HTTP_X_COPY_FROM='/%s/%s' % (
853
                         self.container, self.object),
854
                     HTTP_X_SOURCE_ACCOUNT='user')
855
        self.assertEqual(r.status_code, 201)
856

    
857
        # assert access the new object
858
        r = self.head(url, user='alice')
859
        self.assertEqual(r.status_code, 200)
860
        self.assertTrue('X-Object-Meta-Test' in r)
861
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
862

    
863
        # assert etag is the same
864
        self.assertTrue('X-Object-Hash' in r)
865
        self.assertEqual(r['X-Object-Hash'], self.etag)
866

    
867
        # share object for write
868
        url = join_urls(self.pithos_path, self.user, self.container,
869
                        self.object)
870
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
871
                      HTTP_X_OBJECT_SHARING='write=dan')
872
        self.assertEqual(r.status_code, 202)
873

    
874
        # assert not allowed copy for alice
875
        url = join_urls(self.pithos_path, 'alice', cname, oname)
876
        r = self.put(url, data='', user='alice',
877
                     HTTP_X_OBJECT_META_TEST='testcopy',
878
                     HTTP_X_COPY_FROM='/%s/%s' % (
879
                         self.container, self.object),
880
                     HTTP_X_SOURCE_ACCOUNT='user')
881
        self.assertEqual(r.status_code, 403)
882

    
883
        # assert allowed copy for dan
884
        self.create_container(cname, user='dan')
885
        url = join_urls(self.pithos_path, 'dan', cname, oname)
886
        r = self.put(url, data='', user='dan',
887
                     HTTP_X_OBJECT_META_TEST='testcopy',
888
                     HTTP_X_COPY_FROM='/%s/%s' % (
889
                         self.container, self.object),
890
                     HTTP_X_SOURCE_ACCOUNT='user')
891
        self.assertEqual(r.status_code, 201)
892

    
893
        # assert access the new object
894
        r = self.head(url, user='dan')
895
        self.assertEqual(r.status_code, 200)
896
        self.assertTrue('X-Object-Meta-Test' in r)
897
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
898

    
899
        # assert etag is the same
900
        self.assertTrue('X-Object-Hash' in r)
901
        self.assertEqual(r['X-Object-Hash'], self.etag)
902

    
903
        # assert source object still exists
904
        url = join_urls(self.pithos_path, self.user, self.container,
905
                        self.object)
906
        r = self.head(url)
907
        self.assertEqual(r.status_code, 200)
908
        # assert etag is the same
909
        self.assertTrue('X-Object-Hash' in r)
910
        self.assertEqual(r['X-Object-Hash'], self.etag)
911

    
912
        r = self.get(url)
913
        self.assertEqual(r.status_code, 200)
914
        # assert etag is the same
915
        self.assertTrue('X-Object-Hash' in r)
916
        self.assertEqual(r['X-Object-Hash'], self.etag)
917

    
918
    def test_copy_invalid(self):
919
        # copy from non-existent object
920
        oname = get_random_name()
921
        url = join_urls(self.pithos_path, self.user, self.container, oname)
922
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
923
                     HTTP_X_COPY_FROM='/%s/%s' % (
924
                         self.container, get_random_name()))
925
        self.assertEqual(r.status_code, 404)
926

    
927
        # copy from non-existent container
928
        oname = get_random_name()
929
        url = join_urls(self.pithos_path, self.user, self.container, oname)
930
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
931
                     HTTP_X_COPY_FROM='/%s/%s' % (
932
                         get_random_name(), self.object))
933
        self.assertEqual(r.status_code, 404)
934

    
935
    @pithos_test_settings(API_LIST_LIMIT=10)
936
    def test_copy_dir(self):
937
        folder = self.create_folder(self.container)[0]
938
        subfolder = self.create_folder(
939
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
940
        objects = [subfolder]
941
        append = objects.append
942
        for i in range(11):
943
            append(self.upload_object(self.container,
944
                                      '%s/%d' % (folder, i),
945
                                      depth='1')[0])
946
        other = self.upload_object(self.container, strnextling(folder))[0]
947

    
948
        # copy dir
949
        copy_folder = self.create_folder(self.container)[0]
950
        url = join_urls(self.pithos_path, self.user, self.container,
951
                        copy_folder)
952
        r = self.put('%s?delimiter=/' % url, data='',
953
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
954
        self.assertEqual(r.status_code, 201)
955

    
956
        for obj in objects:
957
            # assert object exists
958
            url = join_urls(self.pithos_path, self.user, self.container,
959
                            obj.replace(folder, copy_folder))
960
            r = self.head(url)
961
            self.assertEqual(r.status_code, 200)
962

    
963
            # assert metadata
964
            meta = self.get_object_meta(self.container, obj)
965
            for k in meta.keys():
966
                key = 'X-Object-Meta-%s' % k
967
                self.assertTrue(key in r)
968
                self.assertEqual(r[key], meta[k])
969

    
970
        # assert other has not been created under copy folder
971
        url = join_urls(self.pithos_path, self.user, self.container,
972
                        '%s/%s' % (copy_folder,
973
                                   other.replace(folder, copy_folder)))
974
        r = self.head(url)
975
        self.assertEqual(r.status_code, 404)
976

    
977

    
978
class ObjectPutMove(PithosAPITest):
979
    def setUp(self):
980
        PithosAPITest.setUp(self)
981
        self.container = 'c1'
982
        self.create_container(self.container)
983
        self.object, self.data = self.upload_object(self.container)[:-1]
984

    
985
        url = join_urls(
986
            self.pithos_path, self.user, self.container, self.object)
987
        r = self.head(url)
988
        self.etag = r['X-Object-Hash']
989

    
990
    def test_move(self):
991
        # move object
992
        oname = get_random_name()
993
        url = join_urls(self.pithos_path, self.user, self.container, oname)
994
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
995
                     HTTP_X_MOVE_FROM='/%s/%s' % (
996
                         self.container, self.object))
997

    
998
        # assert move success
999
        self.assertEqual(r.status_code, 201)
1000

    
1001
        # assert access the new object
1002
        r = self.head(url)
1003
        self.assertEqual(r.status_code, 200)
1004
        self.assertTrue('X-Object-Meta-Test' in r)
1005
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1006

    
1007
        # assert etag is the same
1008
        self.assertTrue('X-Object-Hash' in r)
1009

    
1010
        # assert the initial object has been deleted
1011
        url = join_urls(self.pithos_path, self.user, self.container,
1012
                        self.object)
1013
        r = self.head(url)
1014
        self.assertEqual(r.status_code, 404)
1015

    
1016
    @pithos_test_settings(API_LIST_LIMIT=10)
1017
    def test_move_dir(self):
1018
        folder = self.create_folder(self.container)[0]
1019
        subfolder = self.create_folder(
1020
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1021
        objects = [subfolder]
1022
        append = objects.append
1023
        for i in range(11):
1024
            append(self.upload_object(self.container,
1025
                                      '%s/%d' % (folder, i),
1026
                                      depth='1')[0])
1027
        other = self.upload_object(self.container, strnextling(folder))[0]
1028

    
1029
        # move dir
1030
        copy_folder = self.create_folder(self.container)[0]
1031
        url = join_urls(self.pithos_path, self.user, self.container,
1032
                        copy_folder)
1033
        r = self.put('%s?delimiter=/' % url, data='',
1034
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
1035
        self.assertEqual(r.status_code, 201)
1036

    
1037
        for obj in objects:
1038
            # assert initial object does not exist
1039
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1040
            r = self.head(url)
1041
            self.assertEqual(r.status_code, 404)
1042

    
1043
            # assert new object was created
1044
            url = join_urls(self.pithos_path, self.user, self.container,
1045
                            obj.replace(folder, copy_folder))
1046
            r = self.head(url)
1047
            self.assertEqual(r.status_code, 200)
1048

    
1049
        # assert other has not been created under copy folder
1050
        url = join_urls(self.pithos_path, self.user, self.container,
1051
                        '%s/%s' % (copy_folder,
1052
                                   other.replace(folder, copy_folder)))
1053
        r = self.head(url)
1054
        self.assertEqual(r.status_code, 404)
1055

    
1056
    def test_move_from_other_account(self):
1057
        cname = 'c2'
1058
        self.create_container(cname, user='chuck')
1059
        self.create_container(cname, user='alice')
1060

    
1061
        # share object for read with alice
1062
        url = join_urls(self.pithos_path, self.user, self.container,
1063
                        self.object)
1064
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1065
                      HTTP_X_OBJECT_SHARING='read=alice')
1066
        self.assertEqual(r.status_code, 202)
1067

    
1068
        # assert not allowed move for chuck
1069
        oname = get_random_name()
1070
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
1071
        r = self.put(url, data='', user='chuck',
1072
                     HTTP_X_OBJECT_META_TEST='testcopy',
1073
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1074
                         self.container, self.object),
1075
                     HTTP_X_SOURCE_ACCOUNT='user')
1076

    
1077
        self.assertEqual(r.status_code, 403)
1078

    
1079
        # assert no new object was created
1080
        r = self.head(url, user='chuck')
1081
        self.assertEqual(r.status_code, 404)
1082

    
1083
        # assert not allowed move for alice
1084
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1085
        r = self.put(url, data='', user='alice',
1086
                     HTTP_X_OBJECT_META_TEST='testcopy',
1087
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1088
                         self.container, self.object),
1089
                     HTTP_X_SOURCE_ACCOUNT='user')
1090
        self.assertEqual(r.status_code, 403)
1091

    
1092
        # assert no new object was created
1093
        r = self.head(url, user='alice')
1094
        self.assertEqual(r.status_code, 404)
1095

    
1096
        # share object for write with dan
1097
        url = join_urls(self.pithos_path, self.user, self.container,
1098
                        self.object)
1099
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1100
                      HTTP_X_OBJECT_SHARING='write=dan')
1101
        self.assertEqual(r.status_code, 202)
1102

    
1103
        # assert not allowed move for alice
1104
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1105
        r = self.put(url, data='', user='alice',
1106
                     HTTP_X_OBJECT_META_TEST='testcopy',
1107
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1108
                         self.container, self.object),
1109
                     HTTP_X_SOURCE_ACCOUNT='user')
1110
        self.assertEqual(r.status_code, 403)
1111

    
1112
        # assert no new object was created
1113
        r = self.head(url, user='alice')
1114
        self.assertEqual(r.status_code, 404)
1115

    
1116
        # assert not allowed move for dan
1117
        self.create_container(cname, user='dan')
1118
        url = join_urls(self.pithos_path, 'dan', cname, oname)
1119
        r = self.put(url, data='', user='dan',
1120
                     HTTP_X_OBJECT_META_TEST='testcopy',
1121
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1122
                         self.container, self.object),
1123
                     HTTP_X_SOURCE_ACCOUNT='user')
1124
        self.assertEqual(r.status_code, 403)
1125

    
1126
        # assert no new object was created
1127
        r = self.head(url, user='dan')
1128
        self.assertEqual(r.status_code, 404)
1129

    
1130

    
1131
class ObjectCopy(PithosAPITest):
1132
    def setUp(self):
1133
        PithosAPITest.setUp(self)
1134
        self.container = 'c1'
1135
        self.create_container(self.container)
1136
        self.object, self.data = self.upload_object(self.container)[:-1]
1137

    
1138
        url = join_urls(
1139
            self.pithos_path, self.user, self.container, self.object)
1140
        r = self.head(url)
1141
        self.etag = r['X-Object-Hash']
1142

    
1143
    def test_copy(self):
1144
        with AssertMappingInvariant(self.get_object_info, self.container,
1145
                                    self.object):
1146
            oname = get_random_name()
1147
            # copy object
1148
            url = join_urls(self.pithos_path, self.user, self.container,
1149
                            self.object)
1150
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1151
                          HTTP_DESTINATION='/%s/%s' % (self.container,
1152
                                                       oname))
1153
            # assert copy success
1154
            url = join_urls(self.pithos_path, self.user, self.container,
1155
                            oname)
1156
            self.assertEqual(r.status_code, 201)
1157

    
1158
            # assert access the new object
1159
            r = self.head(url)
1160
            self.assertEqual(r.status_code, 200)
1161
            self.assertTrue('X-Object-Meta-Test' in r)
1162
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1163

    
1164
            # assert etag is the same
1165
            self.assertTrue('X-Object-Hash' in r)
1166
            self.assertEqual(r['X-Object-Hash'], self.etag)
1167

    
1168
            # assert source object still exists
1169
            url = join_urls(self.pithos_path, self.user, self.container,
1170
                            self.object)
1171
            r = self.head(url)
1172
            self.assertEqual(r.status_code, 200)
1173

    
1174
            # assert etag is the same
1175
            self.assertTrue('X-Object-Hash' in r)
1176
            self.assertEqual(r['X-Object-Hash'], self.etag)
1177

    
1178
            r = self.get(url)
1179
            self.assertEqual(r.status_code, 200)
1180

    
1181
            # assert etag is the same
1182
            self.assertTrue('X-Object-Hash' in r)
1183
            self.assertEqual(r['X-Object-Hash'], self.etag)
1184

    
1185
            # copy object to other container (not existing)
1186
            cname = get_random_name()
1187
            url = join_urls(self.pithos_path, self.user, self.container,
1188
                            self.object)
1189
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1190
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1191

    
1192
            # assert destination container does not exist
1193
            url = join_urls(self.pithos_path, self.user, cname,
1194
                            self.object)
1195
            self.assertEqual(r.status_code, 404)
1196

    
1197
            # create container
1198
            self.create_container(cname)
1199

    
1200
            # copy object to other container (existing)
1201
            url = join_urls(self.pithos_path, self.user, self.container,
1202
                            self.object)
1203
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1204
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1205

    
1206
            # assert copy success
1207
            url = join_urls(self.pithos_path, self.user, cname,
1208
                            self.object)
1209
            self.assertEqual(r.status_code, 201)
1210

    
1211
            # assert access the new object
1212
            r = self.head(url)
1213
            self.assertEqual(r.status_code, 200)
1214
            self.assertTrue('X-Object-Meta-Test' in r)
1215
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1216

    
1217
            # assert etag is the same
1218
            self.assertTrue('X-Object-Hash' in r)
1219
            self.assertEqual(r['X-Object-Hash'], self.etag)
1220

    
1221
            # assert source object still exists
1222
            url = join_urls(self.pithos_path, self.user, self.container,
1223
                            self.object)
1224
            r = self.head(url)
1225
            self.assertEqual(r.status_code, 200)
1226

    
1227
            # assert etag is the same
1228
            self.assertTrue('X-Object-Hash' in r)
1229
            self.assertEqual(r['X-Object-Hash'], self.etag)
1230

    
1231
            r = self.get(url)
1232
            self.assertEqual(r.status_code, 200)
1233

    
1234
            # assert etag is the same
1235
            self.assertTrue('X-Object-Hash' in r)
1236
            self.assertEqual(r['X-Object-Hash'], self.etag)
1237

    
1238
    @pithos_test_settings(API_LIST_LIMIT=10)
1239
    def test_copy_dir_contents(self):
1240
        folder = self.create_folder(self.container)[0]
1241
        subfolder = self.create_folder(
1242
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1243
        objects = [subfolder]
1244
        append = objects.append
1245
        for i in range(11):
1246
            append(self.upload_object(self.container,
1247
                                      '%s/%d' % (folder, i),
1248
                                      depth='1')[0])
1249
        other = self.upload_object(self.container, strnextling(folder))[0]
1250

    
1251
        # copy dir
1252
        url = join_urls(self.pithos_path, self.user, self.container,
1253
                        folder)
1254
        copy_folder = self.create_folder(self.container)[0]
1255
        r = self.copy('%s?delimiter=/' % url,
1256
                      HTTP_X_OBJECT_META_TEST='testcopy',
1257
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1258
                                                   copy_folder))
1259
        self.assertEqual(r.status_code, 201)
1260

    
1261
        for obj in objects:
1262
            # assert object exists
1263
            url = join_urls(self.pithos_path, self.user, self.container,
1264
                            obj.replace(folder, copy_folder))
1265
            r = self.head(url)
1266
            self.assertEqual(r.status_code, 200)
1267

    
1268
        # assert other has not been created under copy folder
1269
        url = join_urls(self.pithos_path, self.user, self.container,
1270
                        '%s/%s' % (copy_folder,
1271
                                   other.replace(folder, copy_folder)))
1272
        r = self.head(url)
1273
        self.assertEqual(r.status_code, 404)
1274

    
1275
    def test_copy_to_other_account(self):
1276
        # create a container under alice account
1277
        cname = self.create_container(user='alice')[0]
1278

    
1279
        # create a folder under this container
1280
        folder = self.create_folder(cname, user='alice')[0]
1281

    
1282
        oname = get_random_name()
1283

    
1284
        # copy object to other account container
1285
        url = join_urls(self.pithos_path, self.user, self.container,
1286
                        self.object)
1287
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1288
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1289
                      HTTP_DESTINATION_ACCOUNT='alice')
1290
        self.assertEqual(r.status_code, 403)
1291

    
1292
        # share object for read with user
1293
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1294
        r = self.post(url, user='alice', content_type='',
1295
                      HTTP_CONTENT_RANGE='bytes */*',
1296
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1297
        self.assertEqual(r.status_code, 202)
1298

    
1299
        # assert copy object still is not allowed
1300
        url = join_urls(self.pithos_path, self.user, self.container,
1301
                        self.object)
1302
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1303
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1304
                      HTTP_DESTINATION_ACCOUNT='alice')
1305
        self.assertEqual(r.status_code, 403)
1306

    
1307
        # share object for write with user
1308
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1309
        r = self.post(url, user='alice',  content_type='',
1310
                      HTTP_CONTENT_RANGE='bytes */*',
1311
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1312
        self.assertEqual(r.status_code, 202)
1313

    
1314
        # assert copy object now is allowed
1315
        url = join_urls(self.pithos_path, self.user, self.container,
1316
                        self.object)
1317
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1318
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1319
                      HTTP_DESTINATION_ACCOUNT='alice')
1320
        self.assertEqual(r.status_code, 201)
1321

    
1322
        # assert access the new object
1323
        url = join_urls(self.pithos_path, 'alice', cname, folder, oname)
1324
        r = self.head(url, user='alice')
1325
        self.assertEqual(r.status_code, 200)
1326
        self.assertTrue('X-Object-Meta-Test' in r)
1327
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1328

    
1329
        # assert etag is the same
1330
        self.assertTrue('X-Object-Hash' in r)
1331
        self.assertEqual(r['X-Object-Hash'], self.etag)
1332

    
1333
        # assert source object still exists
1334
        url = join_urls(self.pithos_path, self.user, self.container,
1335
                        self.object)
1336
        r = self.head(url)
1337
        self.assertEqual(r.status_code, 200)
1338

    
1339
        # assert etag is the same
1340
        self.assertTrue('X-Object-Hash' in r)
1341
        self.assertEqual(r['X-Object-Hash'], self.etag)
1342

    
1343
        r = self.get(url)
1344
        self.assertEqual(r.status_code, 200)
1345

    
1346
        # assert etag is the same
1347
        self.assertTrue('X-Object-Hash' in r)
1348
        self.assertEqual(r['X-Object-Hash'], self.etag)
1349

    
1350

    
1351
class ObjectMove(PithosAPITest):
1352
    def setUp(self):
1353
        PithosAPITest.setUp(self)
1354
        self.container = 'c1'
1355
        self.create_container(self.container)
1356
        self.object, self.data = self.upload_object(self.container)[:-1]
1357

    
1358
        url = join_urls(
1359
            self.pithos_path, self.user, self.container, self.object)
1360
        r = self.head(url)
1361
        self.etag = r['X-Object-Hash']
1362

    
1363
    def test_move(self):
1364
        oname = get_random_name()
1365

    
1366
        # move object
1367
        url = join_urls(self.pithos_path, self.user, self.container,
1368
                        self.object)
1369
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1370
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1371
                                                   oname))
1372
        # assert move success
1373
        url = join_urls(self.pithos_path, self.user, self.container,
1374
                        oname)
1375
        self.assertEqual(r.status_code, 201)
1376

    
1377
        # assert access the new object
1378
        r = self.head(url)
1379
        self.assertEqual(r.status_code, 200)
1380
        self.assertTrue('X-Object-Meta-Test' in r)
1381
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1382

    
1383
        # assert etag is the same
1384
        self.assertTrue('X-Object-Hash' in r)
1385
        self.assertEqual(r['X-Object-Hash'], self.etag)
1386

    
1387
        # assert source object does not exist
1388
        url = join_urls(self.pithos_path, self.user, self.container,
1389
                        self.object)
1390
        r = self.head(url)
1391
        self.assertEqual(r.status_code, 404)
1392

    
1393
    @pithos_test_settings(API_LIST_LIMIT=10)
1394
    def test_move_dir_contents(self):
1395
        folder = self.create_folder(self.container)[0]
1396
        subfolder = self.create_folder(
1397
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1398
        objects = [subfolder]
1399
        append = objects.append
1400
        for i in range(11):
1401
            append(self.upload_object(self.container,
1402
                                      '%s/%d' % (folder, i),
1403
                                      depth='1')[0])
1404
        other = self.upload_object(self.container, strnextling(folder))[0]
1405

    
1406
        # copy dir
1407
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1408
        copy_folder = self.create_folder(self.container)[0]
1409
        r = self.move('%s?delimiter=/' % url,
1410
                      HTTP_X_OBJECT_META_TEST='testcopy',
1411
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1412
                                                   copy_folder))
1413
        self.assertEqual(r.status_code, 201)
1414

    
1415
        for obj in objects:
1416
            # assert object exists
1417
            url = join_urls(self.pithos_path, self.user, self.container,
1418
                            obj.replace(folder, copy_folder))
1419
            r = self.head(url)
1420
            self.assertEqual(r.status_code, 200)
1421

    
1422
        # assert other has not been created under copy folder
1423
        url = join_urls(self.pithos_path, self.user, self.container,
1424
                        '%s/%s' % (copy_folder,
1425
                                   other.replace(folder, copy_folder)))
1426
        r = self.head(url)
1427
        self.assertEqual(r.status_code, 404)
1428

    
1429
    def test_move_to_other_container(self):
1430
        # move object to other container (not existing)
1431
        cname = get_random_name()
1432
        url = join_urls(self.pithos_path, self.user, self.container,
1433
                        self.object)
1434
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1435
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1436

    
1437
        # assert destination container does not exist
1438
        url = join_urls(self.pithos_path, self.user, cname,
1439
                        self.object)
1440
        self.assertEqual(r.status_code, 404)
1441

    
1442
        # create container
1443
        self.create_container(cname)
1444

    
1445
        # move object to other container (existing)
1446
        url = join_urls(self.pithos_path, self.user, self.container,
1447
                        self.object)
1448
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1449
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1450

    
1451
        # assert move success
1452
        url = join_urls(self.pithos_path, self.user, cname,
1453
                        self.object)
1454
        self.assertEqual(r.status_code, 201)
1455

    
1456
        # assert access the new object
1457
        r = self.head(url)
1458
        self.assertEqual(r.status_code, 200)
1459
        self.assertTrue('X-Object-Meta-Test' in r)
1460
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1461

    
1462
        # assert etag is the same
1463
        self.assertTrue('X-Object-Hash' in r)
1464
        self.assertEqual(r['X-Object-Hash'], self.etag)
1465

    
1466
        # assert source object does not exist
1467
        url = join_urls(self.pithos_path, self.user, self.container,
1468
                        self.object)
1469
        r = self.head(url)
1470
        self.assertEqual(r.status_code, 404)
1471

    
1472
    def test_move_to_other_account(self):
1473
        # create a container under alice account
1474
        cname = self.create_container(user='alice')[0]
1475

    
1476
        # create a folder under this container
1477
        folder = self.create_folder(cname, user='alice')[0]
1478

    
1479
        oname = get_random_name()
1480

    
1481
        # move object to other account container
1482
        url = join_urls(self.pithos_path, self.user, self.container,
1483
                        self.object)
1484
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1485
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1486
                      HTTP_DESTINATION_ACCOUNT='alice')
1487
        self.assertEqual(r.status_code, 403)
1488

    
1489
        # share object for read with user
1490
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1491
        r = self.post(url, user='alice', content_type='',
1492
                      HTTP_CONTENT_RANGE='bytes */*',
1493
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1494
        self.assertEqual(r.status_code, 202)
1495

    
1496
        # assert move object still is not allowed
1497
        url = join_urls(self.pithos_path, self.user, self.container,
1498
                        self.object)
1499
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1500
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1501
                      HTTP_DESTINATION_ACCOUNT='alice')
1502
        self.assertEqual(r.status_code, 403)
1503

    
1504
        # share object for write with user
1505
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1506
        r = self.post(url, user='alice',  content_type='',
1507
                      HTTP_CONTENT_RANGE='bytes */*',
1508
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1509
        self.assertEqual(r.status_code, 202)
1510

    
1511
        # assert move object now is allowed
1512
        url = join_urls(self.pithos_path, self.user, self.container,
1513
                        self.object)
1514
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1515
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1516
                      HTTP_DESTINATION_ACCOUNT='alice')
1517
        self.assertEqual(r.status_code, 201)
1518

    
1519
        # assert source object does not exist
1520
        url = join_urls(self.pithos_path, self.user, self.container,
1521
                        self.object)
1522
        r = self.head(url)
1523
        self.assertEqual(r.status_code, 404)
1524

    
1525

    
1526
class ObjectPost(PithosAPITest):
1527
    def setUp(self):
1528
        PithosAPITest.setUp(self)
1529
        self.container = 'c1'
1530
        self.create_container(self.container)
1531
        self.object, self.object_data = self.upload_object(self.container)[:2]
1532

    
1533
    def test_update_meta(self):
1534
        with AssertUUidInvariant(self.get_object_info,
1535
                                 self.container,
1536
                                 self.object):
1537
            # update metadata
1538
            d = {'a' * 114: 'b' * 256}
1539
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1540
                          k, v in d.items())
1541
            url = join_urls(self.pithos_path, self.user, self.container,
1542
                            self.object)
1543
            r = self.post(url, content_type='', **kwargs)
1544
            self.assertEqual(r.status_code, 202)
1545

    
1546
            # assert metadata have been updated
1547
            meta = self.get_object_meta(self.container, self.object)
1548

    
1549
            for k, v in d.items():
1550
                self.assertTrue(k.title() in meta)
1551
                self.assertTrue(meta[k.title()], v)
1552

    
1553
            # Header key too large
1554
            d = {'a' * 115: 'b' * 256}
1555
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1556
                          k, v in d.items())
1557
            r = self.post(url, content_type='', **kwargs)
1558
            self.assertEqual(r.status_code, 400)
1559

    
1560
            # Header value too large
1561
            d = {'a' * 114: 'b' * 257}
1562
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1563
                          k, v in d.items())
1564
            r = self.post(url, content_type='', **kwargs)
1565
            self.assertEqual(r.status_code, 400)
1566

    
1567
#            # Check utf-8 meta
1568
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
1569
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1570
#                          k, v in d.items())
1571
#            url = join_urls(self.pithos_path, self.user, self.container,
1572
#                            self.object)
1573
#            r = self.post(url, content_type='', **kwargs)
1574
#            self.assertEqual(r.status_code, 202)
1575
#
1576
#            # assert metadata have been updated
1577
#            meta = self.get_object_meta(self.container, self.object)
1578
#
1579
#            for k, v in d.items():
1580
#                key = 'X-Object-Meta-%s' % k.title()
1581
#                self.assertTrue(key in meta)
1582
#                self.assertTrue(meta[key], v)
1583
#
1584
#            # Header key too large
1585
#            d = {'α' * 114: 'β' * (256 / 2)}
1586
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1587
#                          k, v in d.items())
1588
#            r = self.post(url, content_type='', **kwargs)
1589
#            self.assertEqual(r.status_code, 400)
1590
#
1591
#            # Header value too large
1592
#            d = {'α' * 114: 'β' * 256}
1593
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1594
#                          k, v in d.items())
1595
#            r = self.udpate(url, content_type='', **kwargs)
1596
#            self.assertEqual(r.status_code, 400)
1597

    
1598
    def test_update_object(self):
1599
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1600
        oname, odata = self.upload_object(
1601
            self.container, length=random.randint(
1602
                block_size + 2, 2 * block_size))[:2]
1603

    
1604
        length = len(odata)
1605
        first_byte_pos = random.randint(1, block_size)
1606
        last_byte_pos = random.randint(block_size + 1, length - 1)
1607
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1608
        kwargs = {'content_type': 'application/octet-stream',
1609
                  'HTTP_CONTENT_RANGE': range}
1610

    
1611
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1612
        partial = last_byte_pos - first_byte_pos + 1
1613
        data = get_random_data(partial)
1614
        r = self.post(url, data=data, **kwargs)
1615

    
1616
        self.assertEqual(r.status_code, 204)
1617
        self.assertTrue('ETag' in r)
1618
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1619
                                     data)
1620
        if pithos_settings.UPDATE_MD5:
1621
            etag = md5_hash(updated_data)
1622
        else:
1623
            etag = merkle(updated_data)
1624
        #self.assertEqual(r['ETag'], etag)
1625

    
1626
        # check modified object
1627
        r = self.get(url)
1628

    
1629
        self.assertEqual(r.status_code, 200)
1630
        self.assertEqual(r.content, updated_data)
1631
        self.assertEqual(etag, r['ETag'])
1632

    
1633
    def test_update_object_divided_by_blocksize(self):
1634
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1635
        oname, odata = self.upload_object(self.container,
1636
                                          length=2 * block_size)[:2]
1637

    
1638
        length = len(odata)
1639
        first_byte_pos = block_size
1640
        last_byte_pos = 2 * block_size - 1
1641
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1642
        kwargs = {'content_type': 'application/octet-stream',
1643
                  'HTTP_CONTENT_RANGE': range}
1644

    
1645
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1646
        partial = last_byte_pos - first_byte_pos + 1
1647
        data = get_random_data(partial)
1648
        r = self.post(url, data=data, **kwargs)
1649

    
1650
        self.assertEqual(r.status_code, 204)
1651
        self.assertTrue('ETag' in r)
1652
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1653
                                     data)
1654
        if pithos_settings.UPDATE_MD5:
1655
            etag = md5_hash(updated_data)
1656
        else:
1657
            etag = merkle(updated_data)
1658
        #self.assertEqual(r['ETag'], etag)
1659

    
1660
        # check modified object
1661
        r = self.get(url)
1662

    
1663
        self.assertEqual(r.status_code, 200)
1664
        self.assertEqual(r.content, updated_data)
1665
        self.assertEqual(etag, r['ETag'])
1666

    
1667
    def test_update_object_invalid_content_length(self):
1668
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1669
        oname, odata = self.upload_object(
1670
            self.container, length=random.randint(
1671
                block_size + 1, 2 * block_size))[:2]
1672

    
1673
        length = len(odata)
1674
        first_byte_pos = random.randint(1, block_size)
1675
        last_byte_pos = random.randint(block_size + 1, length - 1)
1676
        partial = last_byte_pos - first_byte_pos + 1
1677
        data = get_random_data(partial)
1678
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1679
        kwargs = {'content_type': 'application/octet-stream',
1680
                  'HTTP_CONTENT_RANGE': range,
1681
                  'CONTENT_LENGTH': str(partial + 1)}
1682

    
1683
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1684
        r = self.post(url, data=data, **kwargs)
1685

    
1686
        self.assertEqual(r.status_code, 400)
1687

    
1688
    def test_update_object_invalid_range(self):
1689
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1690
        oname, odata = self.upload_object(
1691
            self.container, length=random.randint(block_size + 1,
1692
                                                  2 * block_size))[:2]
1693

    
1694
        length = len(odata)
1695
        first_byte_pos = random.randint(1, block_size)
1696
        last_byte_pos = first_byte_pos - 1
1697
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1698
        kwargs = {'content_type': 'application/octet-stream',
1699
                  'HTTP_CONTENT_RANGE': range}
1700

    
1701
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1702
        r = self.post(url, data=get_random_data(), **kwargs)
1703

    
1704
        self.assertEqual(r.status_code, 416)
1705

    
1706
    def test_update_object_out_of_limits(self):
1707
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1708
        oname, odata = self.upload_object(
1709
            self.container, length=random.randint(block_size + 1,
1710
                                                  2 * block_size))[:2]
1711

    
1712
        length = len(odata)
1713
        first_byte_pos = random.randint(1, block_size)
1714
        last_byte_pos = length + 1
1715
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1716
        kwargs = {'content_type': 'application/octet-stream',
1717
                  'HTTP_CONTENT_RANGE': range}
1718

    
1719
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1720
        r = self.post(url, data=get_random_data(), **kwargs)
1721

    
1722
        self.assertEqual(r.status_code, 416)
1723

    
1724
    def test_append(self):
1725
        data = get_random_data()
1726
        length = len(data)
1727
        url = join_urls(self.pithos_path, self.user, self.container,
1728
                        self.object)
1729
        r = self.post(url, data=data, content_type='application/octet-stream',
1730
                      HTTP_CONTENT_LENGTH=str(length),
1731
                      HTTP_CONTENT_RANGE='bytes */*')
1732
        self.assertEqual(r.status_code, 204)
1733

    
1734
        r = self.get(url)
1735
        content = r.content
1736
        self.assertEqual(len(content), len(self.object_data) + length)
1737
        self.assertEqual(content, self.object_data + data)
1738

    
1739
    # TODO Fix the test
1740
    def _test_update_with_chunked_transfer(self):
1741
        data = get_random_data()
1742
        length = len(data)
1743

    
1744
        url = join_urls(self.pithos_path, self.user, self.container,
1745
                        self.object)
1746
        r = self.post(url, data=data, content_type='application/octet-stream',
1747
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1748
                      HTTP_TRANSFER_ENCODING='chunked')
1749
        self.assertEqual(r.status_code, 204)
1750

    
1751
        # check modified object
1752
        r = self.get(url)
1753
        content = r.content
1754
        self.assertEqual(content[0:length], data)
1755
        self.assertEqual(content[length:], self.object_data[length:])
1756

    
1757
    def test_update_from_other_object(self):
1758
        src = self.object
1759
        dest = get_random_name()
1760

    
1761
        url = join_urls(self.pithos_path, self.user, self.container, src)
1762
        r = self.get(url)
1763
        source_data = r.content
1764
        source_meta = self.get_object_info(self.container, src)
1765

    
1766
        # update zero length object
1767
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1768
        r = self.put(url, data='')
1769
        self.assertEqual(r.status_code, 201)
1770

    
1771
        r = self.post(url,
1772
                      HTTP_CONTENT_RANGE='bytes */*',
1773
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1774
        self.assertEqual(r.status_code, 204)
1775

    
1776
        r = self.get(url)
1777
        dest_data = r.content
1778
        dest_meta = self.get_object_info(self.container, dest)
1779

    
1780
        self.assertEqual(source_data, dest_data)
1781
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1782
        self.assertEqual(source_meta['X-Object-Hash'],
1783
                         dest_meta['X-Object-Hash'])
1784
        self.assertTrue(
1785
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1786

    
1787
    def test_update_range_from_other_object(self):
1788
        src = self.object
1789
        dest = get_random_name()
1790

    
1791
        url = join_urls(self.pithos_path, self.user, self.container, src)
1792
        r = self.get(url)
1793
        source_data = r.content
1794
        source_length = len(source_data)
1795

    
1796
        # update zero length object
1797
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1798
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1799
        initial_data = get_random_data(
1800
            length=source_length + random.randint(0, block_size))
1801

    
1802
        r = self.put(url, data=initial_data)
1803
        self.assertEqual(r.status_code, 201)
1804

    
1805
        offset = random.randint(0, source_length - 1)
1806
        upto = random.randint(offset, source_length - 1)
1807
        r = self.post(url,
1808
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1809
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1810
        self.assertEqual(r.status_code, 204)
1811

    
1812
        r = self.get(url)
1813
        content = r.content
1814
        self.assertEqual(content, (initial_data[:offset] +
1815
                                   source_data[:upto - offset + 1] +
1816
                                   initial_data[upto + 1:]))
1817

    
1818
    def test_update_range_from_invalid_other_object(self):
1819
        src = self.object
1820
        dest = get_random_name()
1821

    
1822
        url = join_urls(self.pithos_path, self.user, self.container, src)
1823
        r = self.get(url)
1824

    
1825
        # update zero length object
1826
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1827
        initial_data = get_random_data()
1828
        length = len(initial_data)
1829
        r = self.put(url, data=initial_data)
1830
        self.assertEqual(r.status_code, 201)
1831

    
1832
        offset = random.randint(1, length - 2)
1833
        upto = random.randint(offset, length - 1)
1834

    
1835
        # source object does not start with /
1836
        r = self.post(url,
1837
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1838
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1839
        self.assertEqual(r.status_code, 400)
1840

    
1841
        # source object does not exist
1842
        r = self.post(url,
1843
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1844
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1845
        self.assertEqual(r.status_code, 404)
1846

    
1847
    def test_restore_version(self):
1848
        info = self.get_object_info(self.container, self.object)
1849
        v = []
1850
        append = v.append
1851
        append((info['X-Object-Version'],
1852
                int(info['Content-Length']),
1853
                self.object_data))
1854

    
1855
        # update object
1856
        data, r = self.upload_object(self.container, self.object,
1857
                                     length=v[0][1] - 1)[1:]
1858
        self.assertTrue('X-Object-Version' in r)
1859
        append((r['X-Object-Version'], len(data), data))
1860
        # v[0][1] > v[1][1]
1861

    
1862
        # update with the previous version
1863
        url = join_urls(self.pithos_path, self.user, self.container,
1864
                        self.object)
1865
        r = self.post(url,
1866
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1867
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1868
                                                       self.object),
1869
                      HTTP_X_SOURCE_VERSION=v[0][0],
1870
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1871
        self.assertEqual(r.status_code, 204)
1872
        # v[2][1] = v[0][1] > v[1][1]
1873

    
1874
        # check content
1875
        r = self.get(url)
1876
        content = r.content
1877
        self.assertEqual(len(content), v[0][1])
1878
        self.assertEqual(content, self.object_data)
1879
        append((r['X-Object-Version'], len(content), content))
1880

    
1881
        # update object content(v4) > content(v2)
1882
        data, r = self.upload_object(self.container, self.object,
1883
                                     length=v[2][1] + 1)[1:]
1884
        self.assertTrue('X-Object-Version' in r)
1885
        append((r['X-Object-Version'], len(data), data))
1886
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1887

    
1888
        # update with the previous version
1889
        r = self.post(url,
1890
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1891
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1892
                                                       self.object),
1893
                      HTTP_X_SOURCE_VERSION=v[2][0],
1894
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1895
        self.assertEqual(r.status_code, 204)
1896
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1897

    
1898
        # check content
1899
        r = self.get(url)
1900
        data = r.content
1901
        self.assertEqual(data, v[2][2])
1902
        append((r['X-Object-Version'], len(data), data))
1903

    
1904
    def test_update_from_other_version(self):
1905
        versions = []
1906
        info = self.get_object_info(self.container, self.object)
1907
        versions.append(info['X-Object-Version'])
1908
        pre_length = int(info['Content-Length'])
1909

    
1910
        # update object
1911
        d1, r = self.upload_object(self.container, self.object,
1912
                                   length=pre_length - 1)[1:]
1913
        self.assertTrue('X-Object-Version' in r)
1914
        versions.append(r['X-Object-Version'])
1915

    
1916
        # update object
1917
        d2, r = self.upload_object(self.container, self.object,
1918
                                   length=pre_length - 2)[1:]
1919
        self.assertTrue('X-Object-Version' in r)
1920
        versions.append(r['X-Object-Version'])
1921

    
1922
        # get previous version
1923
        url = join_urls(self.pithos_path, self.user, self.container,
1924
                        self.object)
1925
        r = self.get('%s?version=list&format=json' % url)
1926
        self.assertEqual(r.status_code, 200)
1927
        l = json.loads(r.content)['versions']
1928
        self.assertEqual(len(l), 3)
1929
        self.assertEqual([str(v[0]) for v in l], versions)
1930

    
1931
        # update with the previous version
1932
        r = self.post(url,
1933
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1934
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1935
                                                       self.object),
1936
                      HTTP_X_SOURCE_VERSION=versions[0])
1937
        self.assertEqual(r.status_code, 204)
1938

    
1939
        # check content
1940
        r = self.get(url)
1941
        content = r.content
1942
        self.assertEqual(len(content), pre_length)
1943
        self.assertEqual(content, self.object_data)
1944

    
1945
        # update object
1946
        d3, r = self.upload_object(self.container, self.object,
1947
                                   length=len(d2) + 1)[1:]
1948
        self.assertTrue('X-Object-Version' in r)
1949
        versions.append(r['X-Object-Version'])
1950

    
1951
        # update with the previous version
1952
        r = self.post(url,
1953
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1954
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1955
                                                       self.object),
1956
                      HTTP_X_SOURCE_VERSION=versions[-2])
1957
        self.assertEqual(r.status_code, 204)
1958

    
1959
        # check content
1960
        r = self.get(url)
1961
        content = r.content
1962
        self.assertEqual(content, d2 + d3[-1])
1963

    
1964
    def test_update_invalid_permissions(self):
1965
        url = join_urls(self.pithos_path, self.user, self.container,
1966
                        self.object)
1967
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1968
                      HTTP_X_OBJECT_SHARING='%s' % (257*'a'))
1969
        self.assertEqual(r.status_code, 400)
1970

    
1971
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1972
                      HTTP_X_OBJECT_SHARING='read=%s' % (257*'a'))
1973
        self.assertEqual(r.status_code, 400)
1974

    
1975
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1976
                      HTTP_X_OBJECT_SHARING='write=%s' % (257*'a'))
1977
        self.assertEqual(r.status_code, 400)
1978

    
1979

    
1980
class ObjectDelete(PithosAPITest):
1981
    def setUp(self):
1982
        PithosAPITest.setUp(self)
1983
        self.container = 'c1'
1984
        self.create_container(self.container)
1985
        self.object, self.object_data = self.upload_object(self.container)[:2]
1986

    
1987
    def test_delete(self):
1988
        url = join_urls(self.pithos_path, self.user, self.container,
1989
                        self.object)
1990
        r = self.delete(url)
1991
        self.assertEqual(r.status_code, 204)
1992

    
1993
        r = self.head(url)
1994
        self.assertEqual(r.status_code, 404)
1995

    
1996
    def test_delete_non_existent(self):
1997
        url = join_urls(self.pithos_path, self.user, self.container,
1998
                        get_random_name())
1999
        r = self.delete(url)
2000
        self.assertEqual(r.status_code, 404)
2001

    
2002
    @pithos_test_settings(API_LIST_LIMIT=10)
2003
    def test_delete_dir(self):
2004
        folder = self.create_folder(self.container)[0]
2005
        subfolder = self.create_folder(
2006
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
2007
        objects = [subfolder]
2008
        append = objects.append
2009
        for i in range(11):
2010
            append(self.upload_object(self.container,
2011
                                      '%s/%d' % (folder, i),
2012
                                      depth='1')[0])
2013
        other = self.upload_object(self.container, strnextling(folder))[0]
2014

    
2015
        # move dir
2016
        url = join_urls(self.pithos_path, self.user, self.container, folder)
2017
        r = self.delete('%s?delimiter=/' % url)
2018
        self.assertEqual(r.status_code, 204)
2019

    
2020
        for obj in objects:
2021
            # assert object does not exist
2022
            url = join_urls(self.pithos_path, self.user, self.container, obj)
2023
            r = self.head(url)
2024
            self.assertEqual(r.status_code, 404)
2025

    
2026
        # assert other has not been deleted
2027
        url = join_urls(self.pithos_path, self.user, self.container, other)
2028
        r = self.head(url)
2029
        self.assertEqual(r.status_code, 200)