Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 0c6ab9df

History | View | Annotate | Download (77.7 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from collections import defaultdict
38
from urllib import quote, unquote
39
from functools import partial
40

    
41
from pithos.api.test import (PithosAPITest, pithos_settings,
42
                             AssertMappingInvariant, AssertUUidInvariant,
43
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44
                             DATE_FORMATS, pithos_test_settings)
45
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46
                                  get_random_data, get_random_name)
47

    
48
from synnefo.lib import join_urls
49

    
50
import django.utils.simplejson as json
51

    
52
import random
53
import re
54
import datetime
55
import time as _time
56

    
57
merkle = partial(merkle,
58
                 blocksize=TEST_BLOCK_SIZE,
59
                 blockhash=TEST_HASH_ALGORITHM)
60

    
61

    
62
class ObjectHead(PithosAPITest):
63
    def test_get_object_meta(self):
64
        cname = self.create_container()[0]
65
        oname, odata = self.upload_object(cname)[:-1]
66

    
67
        url = join_urls(self.pithos_path, self.user, cname, oname)
68
        r = self.head(url)
69

    
70
        mandatory = ['Etag',
71
                     'Content-Length',
72
                     'Content-Type',
73
                     'Last-Modified',
74
                     'X-Object-Hash',
75
                     'X-Object-UUID',
76
                     'X-Object-Version',
77
                     'X-Object-Version-Timestamp',
78
                     'X-Object-Modified-By']
79
        for i in mandatory:
80
            self.assertTrue(i in r)
81

    
82
        r = self.post(url, content_type='',
83
                      HTTP_CONTENT_ENCODING='gzip',
84
                      HTTP_CONTENT_DISPOSITION=(
85
                          'attachment; filename="%s"' % oname))
86
        self.assertEqual(r.status_code, 202)
87

    
88
        r = self.head(url)
89
        for i in mandatory:
90
            self.assertTrue(i in r)
91
        self.assertTrue('Content-Encoding' in r)
92
        self.assertEqual(r['Content-Encoding'], 'gzip')
93
        self.assertTrue('Content-Disposition' in r)
94
        self.assertEqual(unquote(r['Content-Disposition']),
95
                         'attachment; filename="%s"' % oname)
96

    
97
        prefix = 'myobject/'
98
        data = ''
99
        for i in range(random.randint(2, 10)):
100
            part = '%s%d' % (prefix, i)
101
            data += self.upload_object(cname, oname=part)[1]
102

    
103
        manifest = '%s/%s' % (cname, prefix)
104
        oname = get_random_name()
105
        url = join_urls(self.pithos_path, self.user, cname, oname)
106
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
107
        self.assertEqual(r.status_code, 201)
108

    
109
        r = self.head(url)
110
        for i in mandatory:
111
            self.assertTrue(i in r)
112
        self.assertTrue('X-Object-Manifest' in r)
113
        self.assertEqual(r['X-Object-Manifest'], manifest)
114

    
115

    
116
class ObjectGet(PithosAPITest):
117
    def setUp(self):
118
        PithosAPITest.setUp(self)
119
        self.containers = ['c1', 'c2']
120

    
121
        # create some containers
122
        for c in self.containers:
123
            self.create_container(c)
124

    
125
        # upload files
126
        self.objects = defaultdict(list)
127
        self.objects['c1'].append(self.upload_object('c1')[0])
128

    
129
    def test_versions(self):
130
        c = 'c1'
131
        o = self.objects[c][0]
132
        url = join_urls(self.pithos_path, self.user, c, o)
133

    
134
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
135
        r = self.post(url, content_type='', **meta)
136
        self.assertEqual(r.status_code, 202)
137

    
138
        url = join_urls(self.pithos_path, self.user, c, o)
139
        r = self.get('%s?version=list&format=json' % url)
140
        self.assertEqual(r.status_code, 200)
141
        l1 = json.loads(r.content)['versions']
142
        self.assertEqual(len(l1), 2)
143

    
144
        # update meta
145
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
146
                'HTTP_X_OBJECT_META_STOCK': 'True'}
147
        r = self.post(url, content_type='', **meta)
148
        self.assertEqual(r.status_code, 202)
149

    
150
        # assert a newly created version has been created
151
        r = self.get('%s?version=list&format=json' % url)
152
        self.assertEqual(r.status_code, 200)
153
        l2 = json.loads(r.content)['versions']
154
        self.assertEqual(len(l2), len(l1) + 1)
155
        self.assertEqual(l2[:-1], l1)
156

    
157
        vserial, _ = l2[-2]
158
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
159
                         {'Quality': 'AAA'})
160

    
161
        # update data
162
        self.append_object_data(c, o)
163

    
164
        # assert a newly created version has been created
165
        r = self.get('%s?version=list&format=json' % url)
166
        self.assertEqual(r.status_code, 200)
167
        l3 = json.loads(r.content)['versions']
168
        self.assertEqual(len(l3), len(l2) + 1)
169
        self.assertEqual(l3[:-1], l2)
170

    
171
    def test_get_version(self):
172
        c = 'c1'
173
        o = self.objects[c][0]
174
        url = join_urls(self.pithos_path, self.user, c, o)
175

    
176
        # Update metadata
177
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
178
        r = self.post(url, content_type='', **meta)
179
        self.assertEqual(r.status_code, 202)
180

    
181
        url = join_urls(self.pithos_path, self.user, c, o)
182
        r = self.get('%s?version=list&format=json' % url)
183
        self.assertEqual(r.status_code, 200)
184
        l = json.loads(r.content)['versions']
185
        self.assertEqual(len(l), 2)
186

    
187
        r = self.head('%s?version=%s' % (url, l[0][0]))
188
        self.assertEqual(r.status_code, 200)
189
        self.assertTrue('X-Object-Meta-Quality' not in r)
190

    
191
        r = self.head('%s?version=%s' % (url, l[1][0]))
192
        self.assertEqual(r.status_code, 200)
193
        self.assertTrue('X-Object-Meta-Quality' in r)
194

    
195
        # test invalid version
196
        r = self.head('%s?version=-1' % url)
197
        self.assertEqual(r.status_code, 404)
198

    
199
        other_name, other_data, r = self.upload_object(c)
200
        self.assertTrue('X-Object-Version' in r)
201
        other_version = r['X-Object-Version']
202

    
203
        self.assertTrue(o != other_name)
204

    
205
        r = self.get('%s?version=%s' % (url, other_version))
206
        self.assertEqual(r.status_code, 404)
207

    
208
        r = self.head('%s?version=%s' % (url, other_version))
209
        self.assertEqual(r.status_code, 404)
210

    
211
    def test_objects_with_trailing_spaces(self):
212
        # create object
213
        oname = self.upload_object('c1')[0]
214
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
215

    
216
        r = self.get(quote('%s ' % url))
217
        self.assertEqual(r.status_code, 404)
218

    
219
        # delete object
220
        self.delete(url)
221

    
222
        r = self.get(url)
223
        self.assertEqual(r.status_code, 404)
224

    
225
        # upload object with trailing space
226
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
227

    
228
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
229
        r = self.get(url)
230
        self.assertEqual(r.status_code, 200)
231

    
232
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
233
        r = self.get(url)
234
        self.assertEqual(r.status_code, 404)
235

    
236
    def test_get_partial(self):
237
        cname = self.containers[0]
238
        oname, odata = self.upload_object(cname, length=512)[:-1]
239
        url = join_urls(self.pithos_path, self.user, cname, oname)
240
        r = self.get(url, HTTP_RANGE='bytes=0-499')
241
        self.assertEqual(r.status_code, 206)
242
        data = r.content
243
        self.assertEqual(data, odata[:500])
244
        self.assertTrue('Content-Range' in r)
245
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
246
        self.assertTrue('Content-Type' in r)
247
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
248

    
249
    def test_get_final_500(self):
250
        cname = self.containers[0]
251
        oname, odata = self.upload_object(cname, length=512)[:-1]
252
        size = len(odata)
253
        url = join_urls(self.pithos_path, self.user, cname, oname)
254
        r = self.get(url, HTTP_RANGE='bytes=-500')
255
        self.assertEqual(r.status_code, 206)
256
        self.assertEqual(r.content, odata[-500:])
257
        self.assertTrue('Content-Range' in r)
258
        self.assertEqual(r['Content-Range'],
259
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
260
        self.assertTrue('Content-Type' in r)
261
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
262

    
263
    def test_get_rest(self):
264
        cname = self.containers[0]
265
        oname, odata = self.upload_object(cname, length=512)[:-1]
266
        size = len(odata)
267
        url = join_urls(self.pithos_path, self.user, cname, oname)
268
        offset = len(odata) - random.randint(1, 512)
269
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
270
        self.assertEqual(r.status_code, 206)
271
        self.assertEqual(r.content, odata[offset:])
272
        self.assertTrue('Content-Range' in r)
273
        self.assertEqual(r['Content-Range'],
274
                         'bytes %s-%s/%s' % (offset, size - 1, size))
275
        self.assertTrue('Content-Type' in r)
276
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
277

    
278
    def test_get_range_not_satisfiable(self):
279
        cname = self.containers[0]
280
        oname, odata = self.upload_object(cname, length=512)[:-1]
281
        url = join_urls(self.pithos_path, self.user, cname, oname)
282

    
283
        # TODO
284
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
285
        #self.assertEqual(r.status_code, 416)
286

    
287
        offset = len(odata) + 1
288
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
289
        self.assertEqual(r.status_code, 416)
290

    
291
    def test_multiple_range(self):
292
        cname = self.containers[0]
293
        oname, odata = self.upload_object(cname)[:-1]
294
        url = join_urls(self.pithos_path, self.user, cname, oname)
295

    
296
        l = ['0-499', '-500', '1000-']
297
        ranges = 'bytes=%s' % ','.join(l)
298
        r = self.get(url, HTTP_RANGE=ranges)
299
        self.assertEqual(r.status_code, 206)
300
        self.assertTrue('content-type' in r)
301
        p = re.compile(
302
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
303
            re.I)
304
        m = p.match(r['content-type'])
305
        if m is None:
306
            self.fail('Invalid multiple range content type')
307
        boundary = m.groupdict()['boundary']
308
        cparts = r.content.split('--%s' % boundary)[1:-1]
309

    
310
        # assert content parts length
311
        self.assertEqual(len(cparts), len(l))
312

    
313
        # for each content part assert headers
314
        i = 0
315
        for cpart in cparts:
316
            content = cpart.split('\r\n')
317
            headers = content[1:3]
318
            content_range = headers[0].split(': ')
319
            self.assertEqual(content_range[0], 'Content-Range')
320

    
321
            r = l[i].split('-')
322
            if not r[0] and not r[1]:
323
                pass
324
            elif not r[0]:
325
                start = len(odata) - int(r[1])
326
                end = len(odata)
327
            elif not r[1]:
328
                start = int(r[0])
329
                end = len(odata)
330
            else:
331
                start = int(r[0])
332
                end = int(r[1]) + 1
333
            fdata = odata[start:end]
334
            sdata = '\r\n'.join(content[4:-1])
335
            self.assertEqual(len(fdata), len(sdata))
336
            self.assertEquals(fdata, sdata)
337
            i += 1
338

    
339
    def test_multiple_range_not_satisfiable(self):
340
        # perform get with multiple range
341
        cname = self.containers[0]
342
        oname, odata = self.upload_object(cname)[:-1]
343
        out_of_range = len(odata) + 1
344
        l = ['0-499', '-500', '%d-' % out_of_range]
345
        ranges = 'bytes=%s' % ','.join(l)
346
        url = join_urls(self.pithos_path, self.user, cname, oname)
347
        r = self.get(url, HTTP_RANGE=ranges)
348
        self.assertEqual(r.status_code, 416)
349

    
350
    def test_get_if_match(self):
351
        cname = self.containers[0]
352
        oname, odata = self.upload_object(cname)[:-1]
353

    
354
        # perform get with If-Match
355
        url = join_urls(self.pithos_path, self.user, cname, oname)
356

    
357
        if pithos_settings.UPDATE_MD5:
358
            etag = md5_hash(odata)
359
        else:
360
            etag = merkle(odata)
361

    
362
        r = self.get(url, HTTP_IF_MATCH=etag)
363

    
364
        # assert get success
365
        self.assertEqual(r.status_code, 200)
366

    
367
        # assert response content
368
        self.assertEqual(r.content, odata)
369

    
370
    def test_get_if_match_star(self):
371
        cname = self.containers[0]
372
        oname, odata = self.upload_object(cname)[:-1]
373

    
374
        # perform get with If-Match *
375
        url = join_urls(self.pithos_path, self.user, cname, oname)
376
        r = self.get(url, HTTP_IF_MATCH='*')
377

    
378
        # assert get success
379
        self.assertEqual(r.status_code, 200)
380

    
381
        # assert response content
382
        self.assertEqual(r.content, odata)
383

    
384
    def test_get_multiple_if_match(self):
385
        cname = self.containers[0]
386
        oname, odata = self.upload_object(cname)[:-1]
387

    
388
        # perform get with If-Match
389
        url = join_urls(self.pithos_path, self.user, cname, oname)
390

    
391
        if pithos_settings.UPDATE_MD5:
392
            etag = md5_hash(odata)
393
        else:
394
            etag = merkle(odata)
395

    
396
        quoted = lambda s: '"%s"' % s
397
        r = self.get(url, HTTP_IF_MATCH=','.join(
398
            [quoted(etag), quoted(get_random_data(64))]))
399

    
400
        # assert get success
401
        self.assertEqual(r.status_code, 200)
402

    
403
        # assert response content
404
        self.assertEqual(r.content, odata)
405

    
406
    def test_if_match_precondition_failed(self):
407
        cname = self.containers[0]
408
        oname, odata = self.upload_object(cname)[:-1]
409

    
410
        # perform get with If-Match
411
        url = join_urls(self.pithos_path, self.user, cname, oname)
412
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
413
        self.assertEqual(r.status_code, 412)
414

    
415
    def test_if_none_match(self):
416
        # upload object
417
        cname = self.containers[0]
418
        oname, odata = self.upload_object(cname)[:-1]
419

    
420
        if pithos_settings.UPDATE_MD5:
421
            etag = md5_hash(odata)
422
        else:
423
            etag = merkle(odata)
424

    
425
        # perform get with If-None-Match
426
        url = join_urls(self.pithos_path, self.user, cname, oname)
427
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
428

    
429
        # assert precondition_failed
430
        self.assertEqual(r.status_code, 304)
431

    
432
        # update object data
433
        r = self.append_object_data(cname, oname)[-1]
434
        self.assertTrue(etag != r['ETag'])
435

    
436
        # perform get with If-None-Match
437
        url = join_urls(self.pithos_path, self.user, cname, oname)
438
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
439

    
440
        # assert get success
441
        self.assertEqual(r.status_code, 200)
442

    
443
    def test_if_none_match_star(self):
444
        # upload object
445
        cname = self.containers[0]
446
        oname, odata = self.upload_object(cname)[:-1]
447

    
448
        # perform get with If-None-Match with star
449
        url = join_urls(self.pithos_path, self.user, cname, oname)
450
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
451

    
452
        self.assertEqual(r.status_code, 304)
453

    
454
    def test_if_modified_since(self):
455
        # upload object
456
        cname = self.containers[0]
457
        oname, odata = self.upload_object(cname)[:-1]
458
        object_info = self.get_object_info(cname, oname)
459
        last_modified = object_info['Last-Modified']
460
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
461
        t1_formats = map(t1.strftime, DATE_FORMATS)
462

    
463
        # Check not modified since
464
        url = join_urls(self.pithos_path, self.user, cname, oname)
465
        for t in t1_formats:
466
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
467
            self.assertEqual(r.status_code, 304)
468

    
469
        _time.sleep(1)
470

    
471
        # update object data
472
        appended_data = self.append_object_data(cname, oname)[1]
473

    
474
        # Check modified since
475
        url = join_urls(self.pithos_path, self.user, cname, oname)
476
        for t in t1_formats:
477
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
478
            self.assertEqual(r.status_code, 200)
479
            self.assertEqual(r.content, odata + appended_data)
480

    
481
    def test_if_modified_since_invalid_date(self):
482
        cname = self.containers[0]
483
        oname, odata = self.upload_object(cname)[:-1]
484
        url = join_urls(self.pithos_path, self.user, cname, oname)
485
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
486
        self.assertEqual(r.status_code, 200)
487
        self.assertEqual(r.content, odata)
488

    
489
    def test_if_not_modified_since(self):
490
        cname = self.containers[0]
491
        oname, odata = self.upload_object(cname)[:-1]
492
        url = join_urls(self.pithos_path, self.user, cname, oname)
493
        object_info = self.get_object_info(cname, oname)
494
        last_modified = object_info['Last-Modified']
495
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
496

    
497
        # Check unmodified
498
        t1 = t + datetime.timedelta(seconds=1)
499
        t1_formats = map(t1.strftime, DATE_FORMATS)
500
        for t in t1_formats:
501
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
502
            self.assertEqual(r.status_code, 200)
503
            self.assertEqual(r.content, odata)
504

    
505
        # modify object
506
        _time.sleep(2)
507
        self.append_object_data(cname, oname)
508

    
509
        object_info = self.get_object_info(cname, oname)
510
        last_modified = object_info['Last-Modified']
511
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
512
        t2 = t - datetime.timedelta(seconds=1)
513
        t2_formats = map(t2.strftime, DATE_FORMATS)
514

    
515
        # check modified
516
        for t in t2_formats:
517
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
518
            self.assertEqual(r.status_code, 412)
519

    
520
        # modify account: update object meta
521
        _time.sleep(1)
522
        self.update_object_meta(cname, oname, {'foo': 'bar'})
523

    
524
        object_info = self.get_object_info(cname, oname)
525
        last_modified = object_info['Last-Modified']
526
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
527
        t3 = t - datetime.timedelta(seconds=1)
528
        t3_formats = map(t3.strftime, DATE_FORMATS)
529

    
530
        # check modified
531
        for t in t3_formats:
532
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
533
            self.assertEqual(r.status_code, 412)
534

    
535
    def test_if_unmodified_since(self):
536
        cname = self.containers[0]
537
        oname, odata = self.upload_object(cname)[:-1]
538
        url = join_urls(self.pithos_path, self.user, cname, oname)
539
        object_info = self.get_object_info(cname, oname)
540
        last_modified = object_info['Last-Modified']
541
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
542
        t = t + datetime.timedelta(seconds=1)
543
        t_formats = map(t.strftime, DATE_FORMATS)
544

    
545
        for tf in t_formats:
546
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
547
            self.assertEqual(r.status_code, 200)
548
            self.assertEqual(r.content, odata)
549

    
550
    def test_if_unmodified_since_precondition_failed(self):
551
        cname = self.containers[0]
552
        oname, odata = self.upload_object(cname)[:-1]
553
        url = join_urls(self.pithos_path, self.user, cname, oname)
554
        object_info = self.get_object_info(cname, oname)
555
        last_modified = object_info['Last-Modified']
556
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
557
        t = t - datetime.timedelta(seconds=1)
558
        t_formats = map(t.strftime, DATE_FORMATS)
559

    
560
        for tf in t_formats:
561
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
562
            self.assertEqual(r.status_code, 412)
563

    
564
    def test_hashes(self):
565
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
566
        cname = self.containers[0]
567
        oname, odata = self.upload_object(cname, length=l)[:-1]
568
        size = len(odata)
569

    
570
        url = join_urls(self.pithos_path, self.user, cname, oname)
571
        r = self.get('%s?format=json&hashmap' % url)
572
        self.assertEqual(r.status_code, 200)
573
        body = json.loads(r.content)
574

    
575
        hashes = body['hashes']
576
        block_size = body['block_size']
577
        block_num = size / block_size if size / block_size == 0 else\
578
            size / block_size + 1
579
        self.assertTrue(len(hashes), block_num)
580
        i = 0
581
        for h in hashes:
582
            start = i * block_size
583
            end = (i + 1) * block_size
584
            hash = merkle(odata[start:end])
585
            self.assertEqual(h, hash)
586
            i += 1
587

    
588

    
589
class ObjectPut(PithosAPITest):
590
    def setUp(self):
591
        PithosAPITest.setUp(self)
592
        self.container = get_random_name()
593
        self.create_container(self.container)
594

    
595
    def test_upload(self):
596
        cname = self.container
597
        oname = get_random_name()
598
        data = get_random_data()
599
        meta = {'test': 'test1'}
600
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
601
                       for k, v in meta.iteritems())
602
        url = join_urls(self.pithos_path, self.user, cname, oname)
603
        r = self.put(url, data=data, content_type='application/pdf', **headers)
604
        self.assertEqual(r.status_code, 201)
605
        self.assertTrue('ETag' in r)
606
        self.assertTrue('X-Object-Version' in r)
607

    
608
        info = self.get_object_info(cname, oname)
609

    
610
        # assert object meta
611
        self.assertTrue('X-Object-Meta-Test' in info)
612
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
613

    
614
        # assert content-type
615
        self.assertEqual(info['content-type'], 'application/pdf')
616

    
617
        # assert uploaded content
618
        r = self.get(url)
619
        self.assertEqual(r.status_code, 200)
620
        self.assertEqual(r.content, data)
621

    
622
    def test_maximum_upload_size_exceeds(self):
623
        cname = self.container
624
        oname = get_random_name()
625

    
626
        # set container quota to 100
627
        url = join_urls(self.pithos_path, self.user, cname)
628
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
629
        self.assertEqual(r.status_code, 202)
630

    
631
        info = self.get_container_info(cname)
632
        length = int(info['X-Container-Policy-Quota']) + 1
633

    
634
        data = get_random_data(length)
635
        url = join_urls(self.pithos_path, self.user, cname, oname)
636
        r = self.put(url, data=data)
637
        self.assertEqual(r.status_code, 413)
638

    
639
    def test_upload_with_name_containing_slash(self):
640
        cname = self.container
641
        oname = '/%s' % get_random_name()
642
        data = get_random_data()
643
        url = join_urls(self.pithos_path, self.user, cname, oname)
644
        r = self.put(url, data=data)
645
        self.assertEqual(r.status_code, 201)
646
        self.assertTrue('ETag' in r)
647
        self.assertTrue('X-Object-Version' in r)
648

    
649
        r = self.get(url)
650
        self.assertEqual(r.status_code, 200)
651
        self.assertEqual(r.content, data)
652

    
653
    def test_upload_unprocessable_entity(self):
654
        cname = self.container
655
        oname = get_random_name()
656
        data = get_random_data()
657
        url = join_urls(self.pithos_path, self.user, cname, oname)
658
        r = self.put(url, data=data, HTTP_ETAG='123')
659
        self.assertEqual(r.status_code, 422)
660

    
661
#    def test_chunked_transfer(self):
662
#        cname = self.container
663
#        oname = '/%s' % get_random_name()
664
#        data = get_random_data()
665
#        url = join_urls(self.pithos_path, self.user, cname, oname)
666
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
667
#        self.assertEqual(r.status_code, 201)
668
#        self.assertTrue('ETag' in r)
669
#        self.assertTrue('X-Object-Version' in r)
670

    
671
    def test_manifestation(self):
672
        cname = self.container
673
        prefix = 'myobject/'
674
        data = ''
675
        for i in range(random.randint(2, 10)):
676
            part = '%s%d' % (prefix, i)
677
            data += self.upload_object(cname, oname=part)[1]
678

    
679
        manifest = '%s/%s' % (cname, prefix)
680
        oname = get_random_name()
681
        url = join_urls(self.pithos_path, self.user, cname, oname)
682
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
683
        self.assertEqual(r.status_code, 201)
684

    
685
        # assert object exists
686
        r = self.get(url)
687
        self.assertEqual(r.status_code, 200)
688

    
689
        # assert its content
690
        self.assertEqual(r.content, data)
691

    
692
        # invalid manifestation
693
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
694
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
695
        r = self.get(url)
696
        self.assertEqual(r.content, '')
697

    
698
    def test_create_zero_length_object(self):
699
        cname = self.container
700
        oname = get_random_name()
701
        url = join_urls(self.pithos_path, self.user, cname, oname)
702
        r = self.put(url, data='')
703
        self.assertEqual(r.status_code, 201)
704

    
705
        r = self.get(url)
706
        self.assertEqual(r.status_code, 200)
707
        self.assertEqual(int(r['Content-Length']), 0)
708
        self.assertEqual(r.content, '')
709

    
710
        r = self.get('%s?hashmap=&format=json' % url)
711
        self.assertEqual(r.status_code, 200)
712
        body = json.loads(r.content)
713
        hashes = body['hashes']
714
        hash = merkle('')
715
        self.assertEqual(hashes, [hash])
716

    
717
    def test_create_object_by_hashmap(self):
718
        cname = self.container
719
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
720

    
721
        # upload an object
722
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
723
        # get it hashmap
724
        url = join_urls(self.pithos_path, self.user, cname, oname)
725
        r = self.get('%s?hashmap=&format=json' % url)
726

    
727
        oname = get_random_name()
728
        url = join_urls(self.pithos_path, self.user, cname, oname)
729
        r = self.put('%s?hashmap=' % url, data=r.content)
730
        self.assertEqual(r.status_code, 201)
731

    
732
        r = self.get(url)
733
        self.assertEqual(r.status_code, 200)
734
        self.assertEqual(r.content, data)
735

    
736
    def test_create_object_by_invalid_hashmap(self):
737
        cname = self.container
738
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
739

    
740
        # upload an object
741
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
742
        # get it hashmap
743
        url = join_urls(self.pithos_path, self.user, cname, oname)
744
        r = self.get('%s?hashmap=&format=json' % url)
745
        data = r.content
746
        try:
747
            hashmap = json.loads(data)
748
        except:
749
            self.fail('JSON format expected')
750

    
751
        oname = get_random_name()
752
        url = join_urls(self.pithos_path, self.user, cname, oname)
753
        hashmap['hashes'] = [get_random_name()]
754
        r = self.put('%s?hashmap=' % url, data=json.dumps(hashmap))
755
        self.assertEqual(r.status_code, 400)
756

    
757

    
758
class ObjectPutCopy(PithosAPITest):
759
    def setUp(self):
760
        PithosAPITest.setUp(self)
761
        self.container = 'c1'
762
        self.create_container(self.container)
763
        self.object, self.data = self.upload_object(self.container)[:-1]
764

    
765
        url = join_urls(
766
            self.pithos_path, self.user, self.container, self.object)
767
        r = self.head(url)
768
        self.etag = r['X-Object-Hash']
769

    
770
    def test_copy(self):
771
        with AssertMappingInvariant(self.get_object_info, self.container,
772
                                    self.object):
773
            # copy object
774
            oname = get_random_name()
775
            url = join_urls(self.pithos_path, self.user, self.container, oname)
776
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
777
                         HTTP_X_COPY_FROM='/%s/%s' % (
778
                             self.container, self.object))
779

    
780
            # assert copy success
781
            self.assertEqual(r.status_code, 201)
782

    
783
            # assert access the new object
784
            r = self.head(url)
785
            self.assertEqual(r.status_code, 200)
786
            self.assertTrue('X-Object-Meta-Test' in r)
787
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
788

    
789
            # assert etag is the same
790
            self.assertTrue('X-Object-Hash' in r)
791
            self.assertEqual(r['X-Object-Hash'], self.etag)
792

    
793
    def test_copy_from_different_container(self):
794
        cname = 'c2'
795
        self.create_container(cname)
796
        with AssertMappingInvariant(self.get_object_info, self.container,
797
                                    self.object):
798
            oname = get_random_name()
799
            url = join_urls(self.pithos_path, self.user, cname, oname)
800
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
801
                         HTTP_X_COPY_FROM='/%s/%s' % (
802
                             self.container, self.object))
803

    
804
            # assert copy success
805
            self.assertEqual(r.status_code, 201)
806

    
807
            # assert access the new object
808
            r = self.head(url)
809
            self.assertEqual(r.status_code, 200)
810
            self.assertTrue('X-Object-Meta-Test' in r)
811
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
812

    
813
            # assert etag is the same
814
            self.assertTrue('X-Object-Hash' in r)
815
            self.assertEqual(r['X-Object-Hash'], self.etag)
816

    
817
    def test_copy_from_other_account(self):
818
        cname = 'c2'
819
        self.create_container(cname, user='chuck')
820
        self.create_container(cname, user='alice')
821

    
822
        # share object for read with alice
823
        url = join_urls(self.pithos_path, self.user, self.container,
824
                        self.object)
825
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
826
                      HTTP_X_OBJECT_SHARING='read=alice')
827
        self.assertEqual(r.status_code, 202)
828

    
829
        # assert not allowed for chuck
830
        oname = get_random_name()
831
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
832
        r = self.put(url, data='', user='chuck',
833
                     HTTP_X_OBJECT_META_TEST='testcopy',
834
                     HTTP_X_COPY_FROM='/%s/%s' % (
835
                         self.container, self.object),
836
                     HTTP_X_SOURCE_ACCOUNT='user')
837

    
838
        self.assertEqual(r.status_code, 403)
839

    
840
        # assert copy success for alice
841
        url = join_urls(self.pithos_path, 'alice', cname, oname)
842
        r = self.put(url, data='', user='alice',
843
                     HTTP_X_OBJECT_META_TEST='testcopy',
844
                     HTTP_X_COPY_FROM='/%s/%s' % (
845
                         self.container, self.object),
846
                     HTTP_X_SOURCE_ACCOUNT='user')
847
        self.assertEqual(r.status_code, 201)
848

    
849
        # assert access the new object
850
        r = self.head(url, user='alice')
851
        self.assertEqual(r.status_code, 200)
852
        self.assertTrue('X-Object-Meta-Test' in r)
853
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
854

    
855
        # assert etag is the same
856
        self.assertTrue('X-Object-Hash' in r)
857
        self.assertEqual(r['X-Object-Hash'], self.etag)
858

    
859
        # share object for write
860
        url = join_urls(self.pithos_path, self.user, self.container,
861
                        self.object)
862
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
863
                      HTTP_X_OBJECT_SHARING='write=dan')
864
        self.assertEqual(r.status_code, 202)
865

    
866
        # assert not allowed copy for alice
867
        url = join_urls(self.pithos_path, 'alice', cname, oname)
868
        r = self.put(url, data='', user='alice',
869
                     HTTP_X_OBJECT_META_TEST='testcopy',
870
                     HTTP_X_COPY_FROM='/%s/%s' % (
871
                         self.container, self.object),
872
                     HTTP_X_SOURCE_ACCOUNT='user')
873
        self.assertEqual(r.status_code, 403)
874

    
875
        # assert allowed copy for dan
876
        self.create_container(cname, user='dan')
877
        url = join_urls(self.pithos_path, 'dan', cname, oname)
878
        r = self.put(url, data='', user='dan',
879
                     HTTP_X_OBJECT_META_TEST='testcopy',
880
                     HTTP_X_COPY_FROM='/%s/%s' % (
881
                         self.container, self.object),
882
                     HTTP_X_SOURCE_ACCOUNT='user')
883
        self.assertEqual(r.status_code, 201)
884

    
885
        # assert access the new object
886
        r = self.head(url, user='dan')
887
        self.assertEqual(r.status_code, 200)
888
        self.assertTrue('X-Object-Meta-Test' in r)
889
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
890

    
891
        # assert etag is the same
892
        self.assertTrue('X-Object-Hash' in r)
893
        self.assertEqual(r['X-Object-Hash'], self.etag)
894

    
895
        # assert source object still exists
896
        url = join_urls(self.pithos_path, self.user, self.container,
897
                        self.object)
898
        r = self.head(url)
899
        self.assertEqual(r.status_code, 200)
900
        # assert etag is the same
901
        self.assertTrue('X-Object-Hash' in r)
902
        self.assertEqual(r['X-Object-Hash'], self.etag)
903

    
904
        r = self.get(url)
905
        self.assertEqual(r.status_code, 200)
906
        # assert etag is the same
907
        self.assertTrue('X-Object-Hash' in r)
908
        self.assertEqual(r['X-Object-Hash'], self.etag)
909

    
910
    def test_copy_invalid(self):
911
        # copy from non-existent object
912
        oname = get_random_name()
913
        url = join_urls(self.pithos_path, self.user, self.container, oname)
914
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
915
                     HTTP_X_COPY_FROM='/%s/%s' % (
916
                         self.container, get_random_name()))
917
        self.assertEqual(r.status_code, 404)
918

    
919
        # copy from non-existent container
920
        oname = get_random_name()
921
        url = join_urls(self.pithos_path, self.user, self.container, oname)
922
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
923
                     HTTP_X_COPY_FROM='/%s/%s' % (
924
                         get_random_name(), self.object))
925
        self.assertEqual(r.status_code, 404)
926

    
927
    @pithos_test_settings(API_LIST_LIMIT=10)
928
    def test_copy_dir(self):
929
        folder = self.create_folder(self.container)[0]
930
        subfolder = self.create_folder(
931
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
932
        objects = [subfolder]
933
        append = objects.append
934
        for i in range(11):
935
            append(self.upload_object(self.container,
936
                                      '%s/%d' % (folder, i),
937
                                      depth='1')[0])
938
        other = self.upload_object(self.container, strnextling(folder))[0]
939

    
940
        # copy dir
941
        copy_folder = self.create_folder(self.container)[0]
942
        url = join_urls(self.pithos_path, self.user, self.container,
943
                        copy_folder)
944
        r = self.put('%s?delimiter=/' % url, data='',
945
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
946
        self.assertEqual(r.status_code, 201)
947

    
948
        for obj in objects:
949
            # assert object exists
950
            url = join_urls(self.pithos_path, self.user, self.container,
951
                            obj.replace(folder, copy_folder))
952
            r = self.head(url)
953
            self.assertEqual(r.status_code, 200)
954

    
955
            # assert metadata
956
            meta = self.get_object_meta(self.container, obj)
957
            for k in meta.keys():
958
                key = 'X-Object-Meta-%s' % k
959
                self.assertTrue(key in r)
960
                self.assertEqual(r[key], meta[k])
961

    
962
        # assert other has not been created under copy folder
963
        url = join_urls(self.pithos_path, self.user, self.container,
964
                        '%s/%s' % (copy_folder,
965
                                   other.replace(folder, copy_folder)))
966
        r = self.head(url)
967
        self.assertEqual(r.status_code, 404)
968

    
969

    
970
class ObjectPutMove(PithosAPITest):
971
    def setUp(self):
972
        PithosAPITest.setUp(self)
973
        self.container = 'c1'
974
        self.create_container(self.container)
975
        self.object, self.data = self.upload_object(self.container)[:-1]
976

    
977
        url = join_urls(
978
            self.pithos_path, self.user, self.container, self.object)
979
        r = self.head(url)
980
        self.etag = r['X-Object-Hash']
981

    
982
    def test_move(self):
983
        # move object
984
        oname = get_random_name()
985
        url = join_urls(self.pithos_path, self.user, self.container, oname)
986
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
987
                     HTTP_X_MOVE_FROM='/%s/%s' % (
988
                         self.container, self.object))
989

    
990
        # assert move success
991
        self.assertEqual(r.status_code, 201)
992

    
993
        # assert access the new object
994
        r = self.head(url)
995
        self.assertEqual(r.status_code, 200)
996
        self.assertTrue('X-Object-Meta-Test' in r)
997
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
998

    
999
        # assert etag is the same
1000
        self.assertTrue('X-Object-Hash' in r)
1001

    
1002
        # assert the initial object has been deleted
1003
        url = join_urls(self.pithos_path, self.user, self.container,
1004
                        self.object)
1005
        r = self.head(url)
1006
        self.assertEqual(r.status_code, 404)
1007

    
1008
    @pithos_test_settings(API_LIST_LIMIT=10)
1009
    def test_move_dir(self):
1010
        folder = self.create_folder(self.container)[0]
1011
        subfolder = self.create_folder(
1012
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1013
        objects = [subfolder]
1014
        append = objects.append
1015
        for i in range(11):
1016
            append(self.upload_object(self.container,
1017
                                      '%s/%d' % (folder, i),
1018
                                      depth='1')[0])
1019
        other = self.upload_object(self.container, strnextling(folder))[0]
1020

    
1021
        # move dir
1022
        copy_folder = self.create_folder(self.container)[0]
1023
        url = join_urls(self.pithos_path, self.user, self.container,
1024
                        copy_folder)
1025
        r = self.put('%s?delimiter=/' % url, data='',
1026
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
1027
        self.assertEqual(r.status_code, 201)
1028

    
1029
        for obj in objects:
1030
            # assert initial object does not exist
1031
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1032
            r = self.head(url)
1033
            self.assertEqual(r.status_code, 404)
1034

    
1035
            # assert new object was created
1036
            url = join_urls(self.pithos_path, self.user, self.container,
1037
                            obj.replace(folder, copy_folder))
1038
            r = self.head(url)
1039
            self.assertEqual(r.status_code, 200)
1040

    
1041
        # assert other has not been created under copy folder
1042
        url = join_urls(self.pithos_path, self.user, self.container,
1043
                        '%s/%s' % (copy_folder,
1044
                                   other.replace(folder, copy_folder)))
1045
        r = self.head(url)
1046
        self.assertEqual(r.status_code, 404)
1047

    
1048
    def test_move_from_other_account(self):
1049
        cname = 'c2'
1050
        self.create_container(cname, user='chuck')
1051
        self.create_container(cname, user='alice')
1052

    
1053
        # share object for read with alice
1054
        url = join_urls(self.pithos_path, self.user, self.container,
1055
                        self.object)
1056
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1057
                      HTTP_X_OBJECT_SHARING='read=alice')
1058
        self.assertEqual(r.status_code, 202)
1059

    
1060
        # assert not allowed move for chuck
1061
        oname = get_random_name()
1062
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
1063
        r = self.put(url, data='', user='chuck',
1064
                     HTTP_X_OBJECT_META_TEST='testcopy',
1065
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1066
                         self.container, self.object),
1067
                     HTTP_X_SOURCE_ACCOUNT='user')
1068

    
1069
        self.assertEqual(r.status_code, 403)
1070

    
1071
        # assert no new object was created
1072
        r = self.head(url, user='chuck')
1073
        self.assertEqual(r.status_code, 404)
1074

    
1075
        # assert not allowed move for alice
1076
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1077
        r = self.put(url, data='', user='alice',
1078
                     HTTP_X_OBJECT_META_TEST='testcopy',
1079
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1080
                         self.container, self.object),
1081
                     HTTP_X_SOURCE_ACCOUNT='user')
1082
        self.assertEqual(r.status_code, 403)
1083

    
1084
        # assert no new object was created
1085
        r = self.head(url, user='alice')
1086
        self.assertEqual(r.status_code, 404)
1087

    
1088
        # share object for write with dan
1089
        url = join_urls(self.pithos_path, self.user, self.container,
1090
                        self.object)
1091
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1092
                      HTTP_X_OBJECT_SHARING='write=dan')
1093
        self.assertEqual(r.status_code, 202)
1094

    
1095
        # assert not allowed move for alice
1096
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1097
        r = self.put(url, data='', user='alice',
1098
                     HTTP_X_OBJECT_META_TEST='testcopy',
1099
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1100
                         self.container, self.object),
1101
                     HTTP_X_SOURCE_ACCOUNT='user')
1102
        self.assertEqual(r.status_code, 403)
1103

    
1104
        # assert no new object was created
1105
        r = self.head(url, user='alice')
1106
        self.assertEqual(r.status_code, 404)
1107

    
1108
        # assert not allowed move for dan
1109
        self.create_container(cname, user='dan')
1110
        url = join_urls(self.pithos_path, 'dan', cname, oname)
1111
        r = self.put(url, data='', user='dan',
1112
                     HTTP_X_OBJECT_META_TEST='testcopy',
1113
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1114
                         self.container, self.object),
1115
                     HTTP_X_SOURCE_ACCOUNT='user')
1116
        self.assertEqual(r.status_code, 403)
1117

    
1118
        # assert no new object was created
1119
        r = self.head(url, user='dan')
1120
        self.assertEqual(r.status_code, 404)
1121

    
1122

    
1123
class ObjectCopy(PithosAPITest):
1124
    def setUp(self):
1125
        PithosAPITest.setUp(self)
1126
        self.container = 'c1'
1127
        self.create_container(self.container)
1128
        self.object, self.data = self.upload_object(self.container)[:-1]
1129

    
1130
        url = join_urls(
1131
            self.pithos_path, self.user, self.container, self.object)
1132
        r = self.head(url)
1133
        self.etag = r['X-Object-Hash']
1134

    
1135
    def test_copy(self):
1136
        with AssertMappingInvariant(self.get_object_info, self.container,
1137
                                    self.object):
1138
            oname = get_random_name()
1139
            # copy object
1140
            url = join_urls(self.pithos_path, self.user, self.container,
1141
                            self.object)
1142
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1143
                          HTTP_DESTINATION='/%s/%s' % (self.container,
1144
                                                       oname))
1145
            # assert copy success
1146
            url = join_urls(self.pithos_path, self.user, self.container,
1147
                            oname)
1148
            self.assertEqual(r.status_code, 201)
1149

    
1150
            # assert access the new object
1151
            r = self.head(url)
1152
            self.assertEqual(r.status_code, 200)
1153
            self.assertTrue('X-Object-Meta-Test' in r)
1154
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1155

    
1156
            # assert etag is the same
1157
            self.assertTrue('X-Object-Hash' in r)
1158
            self.assertEqual(r['X-Object-Hash'], self.etag)
1159

    
1160
            # assert source object still exists
1161
            url = join_urls(self.pithos_path, self.user, self.container,
1162
                            self.object)
1163
            r = self.head(url)
1164
            self.assertEqual(r.status_code, 200)
1165

    
1166
            # assert etag is the same
1167
            self.assertTrue('X-Object-Hash' in r)
1168
            self.assertEqual(r['X-Object-Hash'], self.etag)
1169

    
1170
            r = self.get(url)
1171
            self.assertEqual(r.status_code, 200)
1172

    
1173
            # assert etag is the same
1174
            self.assertTrue('X-Object-Hash' in r)
1175
            self.assertEqual(r['X-Object-Hash'], self.etag)
1176

    
1177
            # copy object to other container (not existing)
1178
            cname = get_random_name()
1179
            url = join_urls(self.pithos_path, self.user, self.container,
1180
                            self.object)
1181
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1182
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1183

    
1184
            # assert destination container does not exist
1185
            url = join_urls(self.pithos_path, self.user, cname,
1186
                            self.object)
1187
            self.assertEqual(r.status_code, 404)
1188

    
1189
            # create container
1190
            self.create_container(cname)
1191

    
1192
            # copy object to other container (existing)
1193
            url = join_urls(self.pithos_path, self.user, self.container,
1194
                            self.object)
1195
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1196
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1197

    
1198
            # assert copy success
1199
            url = join_urls(self.pithos_path, self.user, cname,
1200
                            self.object)
1201
            self.assertEqual(r.status_code, 201)
1202

    
1203
            # assert access the new object
1204
            r = self.head(url)
1205
            self.assertEqual(r.status_code, 200)
1206
            self.assertTrue('X-Object-Meta-Test' in r)
1207
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1208

    
1209
            # assert etag is the same
1210
            self.assertTrue('X-Object-Hash' in r)
1211
            self.assertEqual(r['X-Object-Hash'], self.etag)
1212

    
1213
            # assert source object still exists
1214
            url = join_urls(self.pithos_path, self.user, self.container,
1215
                            self.object)
1216
            r = self.head(url)
1217
            self.assertEqual(r.status_code, 200)
1218

    
1219
            # assert etag is the same
1220
            self.assertTrue('X-Object-Hash' in r)
1221
            self.assertEqual(r['X-Object-Hash'], self.etag)
1222

    
1223
            r = self.get(url)
1224
            self.assertEqual(r.status_code, 200)
1225

    
1226
            # assert etag is the same
1227
            self.assertTrue('X-Object-Hash' in r)
1228
            self.assertEqual(r['X-Object-Hash'], self.etag)
1229

    
1230
    @pithos_test_settings(API_LIST_LIMIT=10)
1231
    def test_copy_dir_contents(self):
1232
        folder = self.create_folder(self.container)[0]
1233
        subfolder = self.create_folder(
1234
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1235
        objects = [subfolder]
1236
        append = objects.append
1237
        for i in range(11):
1238
            append(self.upload_object(self.container,
1239
                                      '%s/%d' % (folder, i),
1240
                                      depth='1')[0])
1241
        other = self.upload_object(self.container, strnextling(folder))[0]
1242

    
1243
        # copy dir
1244
        url = join_urls(self.pithos_path, self.user, self.container,
1245
                        folder)
1246
        copy_folder = self.create_folder(self.container)[0]
1247
        r = self.copy('%s?delimiter=/' % url,
1248
                      HTTP_X_OBJECT_META_TEST='testcopy',
1249
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1250
                                                   copy_folder))
1251
        self.assertEqual(r.status_code, 201)
1252

    
1253
        for obj in objects:
1254
            # assert object exists
1255
            url = join_urls(self.pithos_path, self.user, self.container,
1256
                            obj.replace(folder, copy_folder))
1257
            r = self.head(url)
1258
            self.assertEqual(r.status_code, 200)
1259

    
1260
        # assert other has not been created under copy folder
1261
        url = join_urls(self.pithos_path, self.user, self.container,
1262
                        '%s/%s' % (copy_folder,
1263
                                   other.replace(folder, copy_folder)))
1264
        r = self.head(url)
1265
        self.assertEqual(r.status_code, 404)
1266

    
1267
    def test_copy_to_other_account(self):
1268
        # create a container under alice account
1269
        cname = self.create_container(user='alice')[0]
1270

    
1271
        # create a folder under this container
1272
        folder = self.create_folder(cname, user='alice')[0]
1273

    
1274
        oname = get_random_name()
1275

    
1276
        # copy object to other account container
1277
        url = join_urls(self.pithos_path, self.user, self.container,
1278
                        self.object)
1279
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1280
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1281
                      HTTP_DESTINATION_ACCOUNT='alice')
1282
        self.assertEqual(r.status_code, 403)
1283

    
1284
        # share object for read with user
1285
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1286
        r = self.post(url, user='alice', content_type='',
1287
                      HTTP_CONTENT_RANGE='bytes */*',
1288
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1289
        self.assertEqual(r.status_code, 202)
1290

    
1291
        # assert copy object still is not allowed
1292
        url = join_urls(self.pithos_path, self.user, self.container,
1293
                        self.object)
1294
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1295
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1296
                      HTTP_DESTINATION_ACCOUNT='alice')
1297
        self.assertEqual(r.status_code, 403)
1298

    
1299
        # share object for write with user
1300
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1301
        r = self.post(url, user='alice',  content_type='',
1302
                      HTTP_CONTENT_RANGE='bytes */*',
1303
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1304
        self.assertEqual(r.status_code, 202)
1305

    
1306
        # assert copy object now is allowed
1307
        url = join_urls(self.pithos_path, self.user, self.container,
1308
                        self.object)
1309
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1310
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1311
                      HTTP_DESTINATION_ACCOUNT='alice')
1312
        self.assertEqual(r.status_code, 201)
1313

    
1314
        # assert access the new object
1315
        url = join_urls(self.pithos_path, 'alice', cname, folder, oname)
1316
        r = self.head(url, user='alice')
1317
        self.assertEqual(r.status_code, 200)
1318
        self.assertTrue('X-Object-Meta-Test' in r)
1319
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1320

    
1321
        # assert etag is the same
1322
        self.assertTrue('X-Object-Hash' in r)
1323
        self.assertEqual(r['X-Object-Hash'], self.etag)
1324

    
1325
        # assert source object still exists
1326
        url = join_urls(self.pithos_path, self.user, self.container,
1327
                        self.object)
1328
        r = self.head(url)
1329
        self.assertEqual(r.status_code, 200)
1330

    
1331
        # assert etag is the same
1332
        self.assertTrue('X-Object-Hash' in r)
1333
        self.assertEqual(r['X-Object-Hash'], self.etag)
1334

    
1335
        r = self.get(url)
1336
        self.assertEqual(r.status_code, 200)
1337

    
1338
        # assert etag is the same
1339
        self.assertTrue('X-Object-Hash' in r)
1340
        self.assertEqual(r['X-Object-Hash'], self.etag)
1341

    
1342

    
1343
class ObjectMove(PithosAPITest):
1344
    def setUp(self):
1345
        PithosAPITest.setUp(self)
1346
        self.container = 'c1'
1347
        self.create_container(self.container)
1348
        self.object, self.data = self.upload_object(self.container)[:-1]
1349

    
1350
        url = join_urls(
1351
            self.pithos_path, self.user, self.container, self.object)
1352
        r = self.head(url)
1353
        self.etag = r['X-Object-Hash']
1354

    
1355
    def test_move(self):
1356
        oname = get_random_name()
1357

    
1358
        # move object
1359
        url = join_urls(self.pithos_path, self.user, self.container,
1360
                        self.object)
1361
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1362
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1363
                                                   oname))
1364
        # assert move success
1365
        url = join_urls(self.pithos_path, self.user, self.container,
1366
                        oname)
1367
        self.assertEqual(r.status_code, 201)
1368

    
1369
        # assert access the new object
1370
        r = self.head(url)
1371
        self.assertEqual(r.status_code, 200)
1372
        self.assertTrue('X-Object-Meta-Test' in r)
1373
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1374

    
1375
        # assert etag is the same
1376
        self.assertTrue('X-Object-Hash' in r)
1377
        self.assertEqual(r['X-Object-Hash'], self.etag)
1378

    
1379
        # assert source object does not exist
1380
        url = join_urls(self.pithos_path, self.user, self.container,
1381
                        self.object)
1382
        r = self.head(url)
1383
        self.assertEqual(r.status_code, 404)
1384

    
1385
    @pithos_test_settings(API_LIST_LIMIT=10)
1386
    def test_move_dir_contents(self):
1387
        folder = self.create_folder(self.container)[0]
1388
        subfolder = self.create_folder(
1389
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1390
        objects = [subfolder]
1391
        append = objects.append
1392
        for i in range(11):
1393
            append(self.upload_object(self.container,
1394
                                      '%s/%d' % (folder, i),
1395
                                      depth='1')[0])
1396
        other = self.upload_object(self.container, strnextling(folder))[0]
1397

    
1398
        # copy dir
1399
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1400
        copy_folder = self.create_folder(self.container)[0]
1401
        r = self.move('%s?delimiter=/' % url,
1402
                      HTTP_X_OBJECT_META_TEST='testcopy',
1403
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1404
                                                   copy_folder))
1405
        self.assertEqual(r.status_code, 201)
1406

    
1407
        for obj in objects:
1408
            # assert object exists
1409
            url = join_urls(self.pithos_path, self.user, self.container,
1410
                            obj.replace(folder, copy_folder))
1411
            r = self.head(url)
1412
            self.assertEqual(r.status_code, 200)
1413

    
1414
        # assert other has not been created under copy folder
1415
        url = join_urls(self.pithos_path, self.user, self.container,
1416
                        '%s/%s' % (copy_folder,
1417
                                   other.replace(folder, copy_folder)))
1418
        r = self.head(url)
1419
        self.assertEqual(r.status_code, 404)
1420

    
1421
    def test_move_to_other_container(self):
1422
        # move object to other container (not existing)
1423
        cname = get_random_name()
1424
        url = join_urls(self.pithos_path, self.user, self.container,
1425
                        self.object)
1426
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1427
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1428

    
1429
        # assert destination container does not exist
1430
        url = join_urls(self.pithos_path, self.user, cname,
1431
                        self.object)
1432
        self.assertEqual(r.status_code, 404)
1433

    
1434
        # create container
1435
        self.create_container(cname)
1436

    
1437
        # move object to other container (existing)
1438
        url = join_urls(self.pithos_path, self.user, self.container,
1439
                        self.object)
1440
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1441
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1442

    
1443
        # assert move success
1444
        url = join_urls(self.pithos_path, self.user, cname,
1445
                        self.object)
1446
        self.assertEqual(r.status_code, 201)
1447

    
1448
        # assert access the new object
1449
        r = self.head(url)
1450
        self.assertEqual(r.status_code, 200)
1451
        self.assertTrue('X-Object-Meta-Test' in r)
1452
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1453

    
1454
        # assert etag is the same
1455
        self.assertTrue('X-Object-Hash' in r)
1456
        self.assertEqual(r['X-Object-Hash'], self.etag)
1457

    
1458
        # assert source object does not exist
1459
        url = join_urls(self.pithos_path, self.user, self.container,
1460
                        self.object)
1461
        r = self.head(url)
1462
        self.assertEqual(r.status_code, 404)
1463

    
1464
    def test_move_to_other_account(self):
1465
        # create a container under alice account
1466
        cname = self.create_container(user='alice')[0]
1467

    
1468
        # create a folder under this container
1469
        folder = self.create_folder(cname, user='alice')[0]
1470

    
1471
        oname = get_random_name()
1472

    
1473
        # move object to other account container
1474
        url = join_urls(self.pithos_path, self.user, self.container,
1475
                        self.object)
1476
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1477
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1478
                      HTTP_DESTINATION_ACCOUNT='alice')
1479
        self.assertEqual(r.status_code, 403)
1480

    
1481
        # share object for read with user
1482
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1483
        r = self.post(url, user='alice', content_type='',
1484
                      HTTP_CONTENT_RANGE='bytes */*',
1485
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1486
        self.assertEqual(r.status_code, 202)
1487

    
1488
        # assert move object still is not allowed
1489
        url = join_urls(self.pithos_path, self.user, self.container,
1490
                        self.object)
1491
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1492
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1493
                      HTTP_DESTINATION_ACCOUNT='alice')
1494
        self.assertEqual(r.status_code, 403)
1495

    
1496
        # share object for write with user
1497
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1498
        r = self.post(url, user='alice',  content_type='',
1499
                      HTTP_CONTENT_RANGE='bytes */*',
1500
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1501
        self.assertEqual(r.status_code, 202)
1502

    
1503
        # assert move object now is allowed
1504
        url = join_urls(self.pithos_path, self.user, self.container,
1505
                        self.object)
1506
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1507
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1508
                      HTTP_DESTINATION_ACCOUNT='alice')
1509
        self.assertEqual(r.status_code, 201)
1510

    
1511
        # assert source object does not exist
1512
        url = join_urls(self.pithos_path, self.user, self.container,
1513
                        self.object)
1514
        r = self.head(url)
1515
        self.assertEqual(r.status_code, 404)
1516

    
1517

    
1518
class ObjectPost(PithosAPITest):
1519
    def setUp(self):
1520
        PithosAPITest.setUp(self)
1521
        self.container = 'c1'
1522
        self.create_container(self.container)
1523
        self.object, self.object_data = self.upload_object(self.container)[:2]
1524

    
1525
    def test_update_meta(self):
1526
        with AssertUUidInvariant(self.get_object_info,
1527
                                 self.container,
1528
                                 self.object):
1529
            # update metadata
1530
            d = {'a' * 114: 'b' * 256}
1531
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1532
                          k, v in d.items())
1533
            url = join_urls(self.pithos_path, self.user, self.container,
1534
                            self.object)
1535
            r = self.post(url, content_type='', **kwargs)
1536
            self.assertEqual(r.status_code, 202)
1537

    
1538
            # assert metadata have been updated
1539
            meta = self.get_object_meta(self.container, self.object)
1540

    
1541
            for k, v in d.items():
1542
                self.assertTrue(k.title() in meta)
1543
                self.assertTrue(meta[k.title()], v)
1544

    
1545
            # Header key too large
1546
            d = {'a' * 115: 'b' * 256}
1547
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1548
                          k, v in d.items())
1549
            r = self.post(url, content_type='', **kwargs)
1550
            self.assertEqual(r.status_code, 400)
1551

    
1552
            # Header value too large
1553
            d = {'a' * 114: 'b' * 257}
1554
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1555
                          k, v in d.items())
1556
            r = self.post(url, content_type='', **kwargs)
1557
            self.assertEqual(r.status_code, 400)
1558

    
1559
#            # Check utf-8 meta
1560
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
1561
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1562
#                          k, v in d.items())
1563
#            url = join_urls(self.pithos_path, self.user, self.container,
1564
#                            self.object)
1565
#            r = self.post(url, content_type='', **kwargs)
1566
#            self.assertEqual(r.status_code, 202)
1567
#
1568
#            # assert metadata have been updated
1569
#            meta = self.get_object_meta(self.container, self.object)
1570
#
1571
#            for k, v in d.items():
1572
#                key = 'X-Object-Meta-%s' % k.title()
1573
#                self.assertTrue(key in meta)
1574
#                self.assertTrue(meta[key], v)
1575
#
1576
#            # Header key too large
1577
#            d = {'α' * 114: 'β' * (256 / 2)}
1578
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1579
#                          k, v in d.items())
1580
#            r = self.post(url, content_type='', **kwargs)
1581
#            self.assertEqual(r.status_code, 400)
1582
#
1583
#            # Header value too large
1584
#            d = {'α' * 114: 'β' * 256}
1585
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1586
#                          k, v in d.items())
1587
#            r = self.udpate(url, content_type='', **kwargs)
1588
#            self.assertEqual(r.status_code, 400)
1589

    
1590
    def test_update_object(self):
1591
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1592
        oname, odata = self.upload_object(
1593
            self.container, length=random.randint(
1594
                block_size + 2, 2 * block_size))[:2]
1595

    
1596
        length = len(odata)
1597
        first_byte_pos = random.randint(1, block_size)
1598
        last_byte_pos = random.randint(block_size + 1, length - 1)
1599
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1600
        kwargs = {'content_type': 'application/octet-stream',
1601
                  'HTTP_CONTENT_RANGE': range}
1602

    
1603
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1604
        partial = last_byte_pos - first_byte_pos + 1
1605
        data = get_random_data(partial)
1606
        r = self.post(url, data=data, **kwargs)
1607

    
1608
        self.assertEqual(r.status_code, 204)
1609
        self.assertTrue('ETag' in r)
1610
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1611
                                     data)
1612
        if pithos_settings.UPDATE_MD5:
1613
            etag = md5_hash(updated_data)
1614
        else:
1615
            etag = merkle(updated_data)
1616
        #self.assertEqual(r['ETag'], etag)
1617

    
1618
        # check modified object
1619
        r = self.get(url)
1620

    
1621
        self.assertEqual(r.status_code, 200)
1622
        self.assertEqual(r.content, updated_data)
1623
        self.assertEqual(etag, r['ETag'])
1624

    
1625
    def test_update_object_divided_by_blocksize(self):
1626
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1627
        oname, odata = self.upload_object(self.container,
1628
                                          length=2 * block_size)[:2]
1629

    
1630
        length = len(odata)
1631
        first_byte_pos = block_size
1632
        last_byte_pos = 2 * block_size - 1
1633
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1634
        kwargs = {'content_type': 'application/octet-stream',
1635
                  'HTTP_CONTENT_RANGE': range}
1636

    
1637
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1638
        partial = last_byte_pos - first_byte_pos + 1
1639
        data = get_random_data(partial)
1640
        r = self.post(url, data=data, **kwargs)
1641

    
1642
        self.assertEqual(r.status_code, 204)
1643
        self.assertTrue('ETag' in r)
1644
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1645
                                     data)
1646
        if pithos_settings.UPDATE_MD5:
1647
            etag = md5_hash(updated_data)
1648
        else:
1649
            etag = merkle(updated_data)
1650
        #self.assertEqual(r['ETag'], etag)
1651

    
1652
        # check modified object
1653
        r = self.get(url)
1654

    
1655
        self.assertEqual(r.status_code, 200)
1656
        self.assertEqual(r.content, updated_data)
1657
        self.assertEqual(etag, r['ETag'])
1658

    
1659
    def test_update_object_invalid_content_length(self):
1660
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1661
        oname, odata = self.upload_object(
1662
            self.container, length=random.randint(
1663
                block_size + 1, 2 * block_size))[:2]
1664

    
1665
        length = len(odata)
1666
        first_byte_pos = random.randint(1, block_size)
1667
        last_byte_pos = random.randint(block_size + 1, length - 1)
1668
        partial = last_byte_pos - first_byte_pos + 1
1669
        data = get_random_data(partial)
1670
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1671
        kwargs = {'content_type': 'application/octet-stream',
1672
                  'HTTP_CONTENT_RANGE': range,
1673
                  'CONTENT_LENGTH': str(partial + 1)}
1674

    
1675
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1676
        r = self.post(url, data=data, **kwargs)
1677

    
1678
        self.assertEqual(r.status_code, 400)
1679

    
1680
    def test_update_object_invalid_range(self):
1681
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1682
        oname, odata = self.upload_object(
1683
            self.container, length=random.randint(block_size + 1,
1684
                                                  2 * block_size))[:2]
1685

    
1686
        length = len(odata)
1687
        first_byte_pos = random.randint(1, block_size)
1688
        last_byte_pos = first_byte_pos - 1
1689
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1690
        kwargs = {'content_type': 'application/octet-stream',
1691
                  'HTTP_CONTENT_RANGE': range}
1692

    
1693
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1694
        r = self.post(url, data=get_random_data(), **kwargs)
1695

    
1696
        self.assertEqual(r.status_code, 416)
1697

    
1698
    def test_update_object_out_of_limits(self):
1699
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1700
        oname, odata = self.upload_object(
1701
            self.container, length=random.randint(block_size + 1,
1702
                                                  2 * block_size))[:2]
1703

    
1704
        length = len(odata)
1705
        first_byte_pos = random.randint(1, block_size)
1706
        last_byte_pos = length + 1
1707
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1708
        kwargs = {'content_type': 'application/octet-stream',
1709
                  'HTTP_CONTENT_RANGE': range}
1710

    
1711
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1712
        r = self.post(url, data=get_random_data(), **kwargs)
1713

    
1714
        self.assertEqual(r.status_code, 416)
1715

    
1716
    def test_append(self):
1717
        data = get_random_data()
1718
        length = len(data)
1719
        url = join_urls(self.pithos_path, self.user, self.container,
1720
                        self.object)
1721
        r = self.post(url, data=data, content_type='application/octet-stream',
1722
                      HTTP_CONTENT_LENGTH=str(length),
1723
                      HTTP_CONTENT_RANGE='bytes */*')
1724
        self.assertEqual(r.status_code, 204)
1725

    
1726
        r = self.get(url)
1727
        content = r.content
1728
        self.assertEqual(len(content), len(self.object_data) + length)
1729
        self.assertEqual(content, self.object_data + data)
1730

    
1731
    # TODO Fix the test
1732
    def _test_update_with_chunked_transfer(self):
1733
        data = get_random_data()
1734
        length = len(data)
1735

    
1736
        url = join_urls(self.pithos_path, self.user, self.container,
1737
                        self.object)
1738
        r = self.post(url, data=data, content_type='application/octet-stream',
1739
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1740
                      HTTP_TRANSFER_ENCODING='chunked')
1741
        self.assertEqual(r.status_code, 204)
1742

    
1743
        # check modified object
1744
        r = self.get(url)
1745
        content = r.content
1746
        self.assertEqual(content[0:length], data)
1747
        self.assertEqual(content[length:], self.object_data[length:])
1748

    
1749
    def test_update_from_other_object(self):
1750
        src = self.object
1751
        dest = get_random_name()
1752

    
1753
        url = join_urls(self.pithos_path, self.user, self.container, src)
1754
        r = self.get(url)
1755
        source_data = r.content
1756
        source_meta = self.get_object_info(self.container, src)
1757

    
1758
        # update zero length object
1759
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1760
        r = self.put(url, data='')
1761
        self.assertEqual(r.status_code, 201)
1762

    
1763
        r = self.post(url,
1764
                      HTTP_CONTENT_RANGE='bytes */*',
1765
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1766
        self.assertEqual(r.status_code, 204)
1767

    
1768
        r = self.get(url)
1769
        dest_data = r.content
1770
        dest_meta = self.get_object_info(self.container, dest)
1771

    
1772
        self.assertEqual(source_data, dest_data)
1773
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1774
        self.assertEqual(source_meta['X-Object-Hash'],
1775
                         dest_meta['X-Object-Hash'])
1776
        self.assertTrue(
1777
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1778

    
1779
    def test_update_range_from_other_object(self):
1780
        src = self.object
1781
        dest = get_random_name()
1782

    
1783
        url = join_urls(self.pithos_path, self.user, self.container, src)
1784
        r = self.get(url)
1785
        source_data = r.content
1786
        source_length = len(source_data)
1787

    
1788
        # update zero length object
1789
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1790
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1791
        initial_data = get_random_data(
1792
            length=source_length + random.randint(0, block_size))
1793

    
1794
        r = self.put(url, data=initial_data)
1795
        self.assertEqual(r.status_code, 201)
1796

    
1797
        offset = random.randint(0, source_length - 1)
1798
        upto = random.randint(offset, source_length - 1)
1799
        r = self.post(url,
1800
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1801
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1802
        self.assertEqual(r.status_code, 204)
1803

    
1804
        r = self.get(url)
1805
        content = r.content
1806
        self.assertEqual(content, (initial_data[:offset] +
1807
                                   source_data[:upto - offset + 1] +
1808
                                   initial_data[upto + 1:]))
1809

    
1810
    def test_update_range_from_invalid_other_object(self):
1811
        src = self.object
1812
        dest = get_random_name()
1813

    
1814
        url = join_urls(self.pithos_path, self.user, self.container, src)
1815
        r = self.get(url)
1816

    
1817
        # update zero length object
1818
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1819
        initial_data = get_random_data()
1820
        length = len(initial_data)
1821
        r = self.put(url, data=initial_data)
1822
        self.assertEqual(r.status_code, 201)
1823

    
1824
        offset = random.randint(1, length - 2)
1825
        upto = random.randint(offset, length - 1)
1826

    
1827
        # source object does not start with /
1828
        r = self.post(url,
1829
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1830
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1831
        self.assertEqual(r.status_code, 400)
1832

    
1833
        # source object does not exist
1834
        r = self.post(url,
1835
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1836
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1837
        self.assertEqual(r.status_code, 404)
1838

    
1839
    def test_restore_version(self):
1840
        info = self.get_object_info(self.container, self.object)
1841
        v = []
1842
        append = v.append
1843
        append((info['X-Object-Version'],
1844
                int(info['Content-Length']),
1845
                self.object_data))
1846

    
1847
        # update object
1848
        data, r = self.upload_object(self.container, self.object,
1849
                                     length=v[0][1] - 1)[1:]
1850
        self.assertTrue('X-Object-Version' in r)
1851
        append((r['X-Object-Version'], len(data), data))
1852
        # v[0][1] > v[1][1]
1853

    
1854
        # update with the previous version
1855
        url = join_urls(self.pithos_path, self.user, self.container,
1856
                        self.object)
1857
        r = self.post(url,
1858
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1859
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1860
                                                       self.object),
1861
                      HTTP_X_SOURCE_VERSION=v[0][0],
1862
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1863
        self.assertEqual(r.status_code, 204)
1864
        # v[2][1] = v[0][1] > v[1][1]
1865

    
1866
        # check content
1867
        r = self.get(url)
1868
        content = r.content
1869
        self.assertEqual(len(content), v[0][1])
1870
        self.assertEqual(content, self.object_data)
1871
        append((r['X-Object-Version'], len(content), content))
1872

    
1873
        # update object content(v4) > content(v2)
1874
        data, r = self.upload_object(self.container, self.object,
1875
                                     length=v[2][1] + 1)[1:]
1876
        self.assertTrue('X-Object-Version' in r)
1877
        append((r['X-Object-Version'], len(data), data))
1878
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1879

    
1880
        # update with the previous version
1881
        r = self.post(url,
1882
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1883
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1884
                                                       self.object),
1885
                      HTTP_X_SOURCE_VERSION=v[2][0],
1886
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1887
        self.assertEqual(r.status_code, 204)
1888
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1889

    
1890
        # check content
1891
        r = self.get(url)
1892
        data = r.content
1893
        self.assertEqual(data, v[2][2])
1894
        append((r['X-Object-Version'], len(data), data))
1895

    
1896
    def test_update_from_other_version(self):
1897
        versions = []
1898
        info = self.get_object_info(self.container, self.object)
1899
        versions.append(info['X-Object-Version'])
1900
        pre_length = int(info['Content-Length'])
1901

    
1902
        # update object
1903
        d1, r = self.upload_object(self.container, self.object,
1904
                                   length=pre_length - 1)[1:]
1905
        self.assertTrue('X-Object-Version' in r)
1906
        versions.append(r['X-Object-Version'])
1907

    
1908
        # update object
1909
        d2, r = self.upload_object(self.container, self.object,
1910
                                   length=pre_length - 2)[1:]
1911
        self.assertTrue('X-Object-Version' in r)
1912
        versions.append(r['X-Object-Version'])
1913

    
1914
        # get previous version
1915
        url = join_urls(self.pithos_path, self.user, self.container,
1916
                        self.object)
1917
        r = self.get('%s?version=list&format=json' % url)
1918
        self.assertEqual(r.status_code, 200)
1919
        l = json.loads(r.content)['versions']
1920
        self.assertEqual(len(l), 3)
1921
        self.assertEqual([str(v[0]) for v in l], versions)
1922

    
1923
        # update with the previous version
1924
        r = self.post(url,
1925
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1926
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1927
                                                       self.object),
1928
                      HTTP_X_SOURCE_VERSION=versions[0])
1929
        self.assertEqual(r.status_code, 204)
1930

    
1931
        # check content
1932
        r = self.get(url)
1933
        content = r.content
1934
        self.assertEqual(len(content), pre_length)
1935
        self.assertEqual(content, self.object_data)
1936

    
1937
        # update object
1938
        d3, r = self.upload_object(self.container, self.object,
1939
                                   length=len(d2) + 1)[1:]
1940
        self.assertTrue('X-Object-Version' in r)
1941
        versions.append(r['X-Object-Version'])
1942

    
1943
        # update with the previous version
1944
        r = self.post(url,
1945
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1946
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1947
                                                       self.object),
1948
                      HTTP_X_SOURCE_VERSION=versions[-2])
1949
        self.assertEqual(r.status_code, 204)
1950

    
1951
        # check content
1952
        r = self.get(url)
1953
        content = r.content
1954
        self.assertEqual(content, d2 + d3[-1])
1955

    
1956
    def test_update_invalid_permissions(self):
1957
        url = join_urls(self.pithos_path, self.user, self.container,
1958
                        self.object)
1959
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1960
                      HTTP_X_OBJECT_SHARING='%s' % (257*'a'))
1961
        self.assertEqual(r.status_code, 400)
1962

    
1963
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1964
                      HTTP_X_OBJECT_SHARING='read=%s' % (257*'a'))
1965
        self.assertEqual(r.status_code, 400)
1966

    
1967
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1968
                      HTTP_X_OBJECT_SHARING='write=%s' % (257*'a'))
1969
        self.assertEqual(r.status_code, 400)
1970

    
1971

    
1972
class ObjectDelete(PithosAPITest):
1973
    def setUp(self):
1974
        PithosAPITest.setUp(self)
1975
        self.container = 'c1'
1976
        self.create_container(self.container)
1977
        self.object, self.object_data = self.upload_object(self.container)[:2]
1978

    
1979
    def test_delete(self):
1980
        url = join_urls(self.pithos_path, self.user, self.container,
1981
                        self.object)
1982
        r = self.delete(url)
1983
        self.assertEqual(r.status_code, 204)
1984

    
1985
        r = self.head(url)
1986
        self.assertEqual(r.status_code, 404)
1987

    
1988
    def test_delete_non_existent(self):
1989
        url = join_urls(self.pithos_path, self.user, self.container,
1990
                        get_random_name())
1991
        r = self.delete(url)
1992
        self.assertEqual(r.status_code, 404)
1993

    
1994
    @pithos_test_settings(API_LIST_LIMIT=10)
1995
    def test_delete_dir(self):
1996
        folder = self.create_folder(self.container)[0]
1997
        subfolder = self.create_folder(
1998
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1999
        objects = [subfolder]
2000
        append = objects.append
2001
        for i in range(11):
2002
            append(self.upload_object(self.container,
2003
                                      '%s/%d' % (folder, i),
2004
                                      depth='1')[0])
2005
        other = self.upload_object(self.container, strnextling(folder))[0]
2006

    
2007
        # move dir
2008
        url = join_urls(self.pithos_path, self.user, self.container, folder)
2009
        r = self.delete('%s?delimiter=/' % url)
2010
        self.assertEqual(r.status_code, 204)
2011

    
2012
        for obj in objects:
2013
            # assert object does not exist
2014
            url = join_urls(self.pithos_path, self.user, self.container, obj)
2015
            r = self.head(url)
2016
            self.assertEqual(r.status_code, 404)
2017

    
2018
        # assert other has not been deleted
2019
        url = join_urls(self.pithos_path, self.user, self.container, other)
2020
        r = self.head(url)
2021
        self.assertEqual(r.status_code, 200)