Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 54b7d9b0

History | View | Annotate | Download (78.2 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from collections import defaultdict
38
from urllib import quote, unquote
39
from functools import partial
40

    
41
from pithos.api.test import (PithosAPITest, pithos_settings,
42
                             AssertMappingInvariant, AssertUUidInvariant,
43
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44
                             DATE_FORMATS, pithos_test_settings)
45
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46
                                  get_random_data, get_random_name)
47

    
48
from synnefo.lib import join_urls
49

    
50
import django.utils.simplejson as json
51

    
52
import random
53
import re
54
import datetime
55
import time as _time
56

    
57
merkle = partial(merkle,
58
                 blocksize=TEST_BLOCK_SIZE,
59
                 blockhash=TEST_HASH_ALGORITHM)
60

    
61

    
62
class ObjectHead(PithosAPITest):
63
    def test_get_object_meta(self):
64
        cname = self.create_container()[0]
65
        oname, odata = self.upload_object(cname)[:-1]
66

    
67
        url = join_urls(self.pithos_path, self.user, cname, oname)
68
        r = self.head(url)
69

    
70
        mandatory = ['Etag',
71
                     'Content-Length',
72
                     'Content-Type',
73
                     'Last-Modified',
74
                     'X-Object-Hash',
75
                     'X-Object-UUID',
76
                     'X-Object-Version',
77
                     'X-Object-Version-Timestamp',
78
                     'X-Object-Modified-By']
79
        for i in mandatory:
80
            self.assertTrue(i in r)
81

    
82
        r = self.post(url, content_type='',
83
                      HTTP_CONTENT_ENCODING='gzip',
84
                      HTTP_CONTENT_DISPOSITION=(
85
                          'attachment; filename="%s"' % oname))
86
        self.assertEqual(r.status_code, 202)
87

    
88
        r = self.head(url)
89
        for i in mandatory:
90
            self.assertTrue(i in r)
91
        self.assertTrue('Content-Encoding' in r)
92
        self.assertEqual(r['Content-Encoding'], 'gzip')
93
        self.assertTrue('Content-Disposition' in r)
94
        self.assertEqual(unquote(r['Content-Disposition']),
95
                         'attachment; filename="%s"' % oname)
96

    
97
        prefix = 'myobject/'
98
        data = ''
99
        for i in range(random.randint(2, 10)):
100
            part = '%s%d' % (prefix, i)
101
            data += self.upload_object(cname, oname=part)[1]
102

    
103
        manifest = '%s/%s' % (cname, prefix)
104
        oname = get_random_name()
105
        url = join_urls(self.pithos_path, self.user, cname, oname)
106
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
107
        self.assertEqual(r.status_code, 201)
108

    
109
        r = self.head(url)
110
        for i in mandatory:
111
            self.assertTrue(i in r)
112
        self.assertTrue('X-Object-Manifest' in r)
113
        self.assertEqual(r['X-Object-Manifest'], manifest)
114

    
115

    
116
class ObjectGet(PithosAPITest):
117
    def setUp(self):
118
        PithosAPITest.setUp(self)
119
        self.containers = ['c1', 'c2']
120

    
121
        # create some containers
122
        for c in self.containers:
123
            self.create_container(c)
124

    
125
        # upload files
126
        self.objects = defaultdict(list)
127
        self.objects['c1'].append(self.upload_object('c1')[0])
128

    
129
    def test_versions(self):
130
        c = 'c1'
131
        o = self.objects[c][0]
132
        url = join_urls(self.pithos_path, self.user, c, o)
133

    
134
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
135
        r = self.post(url, content_type='', **meta)
136
        self.assertEqual(r.status_code, 202)
137

    
138
        url = join_urls(self.pithos_path, self.user, c, o)
139
        r = self.get('%s?version=list&format=json' % url)
140
        self.assertEqual(r.status_code, 200)
141
        l1 = json.loads(r.content)['versions']
142
        self.assertEqual(len(l1), 2)
143

    
144
        # update meta
145
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
146
                'HTTP_X_OBJECT_META_STOCK': 'True'}
147
        r = self.post(url, content_type='', **meta)
148
        self.assertEqual(r.status_code, 202)
149

    
150
        # assert a newly created version has been created
151
        r = self.get('%s?version=list&format=json' % url)
152
        self.assertEqual(r.status_code, 200)
153
        l2 = json.loads(r.content)['versions']
154
        self.assertEqual(len(l2), len(l1) + 1)
155
        self.assertEqual(l2[:-1], l1)
156

    
157
        vserial, _ = l2[-2]
158
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
159
                         {'Quality': 'AAA'})
160

    
161
        # update data
162
        self.append_object_data(c, o)
163

    
164
        # assert a newly created version has been created
165
        r = self.get('%s?version=list&format=json' % url)
166
        self.assertEqual(r.status_code, 200)
167
        l3 = json.loads(r.content)['versions']
168
        self.assertEqual(len(l3), len(l2) + 1)
169
        self.assertEqual(l3[:-1], l2)
170

    
171
    def test_get_version(self):
172
        c = 'c1'
173
        o = self.objects[c][0]
174
        url = join_urls(self.pithos_path, self.user, c, o)
175

    
176
        # Update metadata
177
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
178
        r = self.post(url, content_type='', **meta)
179
        self.assertEqual(r.status_code, 202)
180

    
181
        url = join_urls(self.pithos_path, self.user, c, o)
182
        r = self.get('%s?version=list&format=json' % url)
183
        self.assertEqual(r.status_code, 200)
184
        l = json.loads(r.content)['versions']
185
        self.assertEqual(len(l), 2)
186

    
187
        r = self.head('%s?version=%s' % (url, l[0][0]))
188
        self.assertEqual(r.status_code, 200)
189
        self.assertTrue('X-Object-Meta-Quality' not in r)
190

    
191
        r = self.head('%s?version=%s' % (url, l[1][0]))
192
        self.assertEqual(r.status_code, 200)
193
        self.assertTrue('X-Object-Meta-Quality' in r)
194

    
195
        # test invalid version
196
        r = self.head('%s?version=-1' % url)
197
        self.assertEqual(r.status_code, 404)
198

    
199
        other_name, other_data, r = self.upload_object(c)
200
        self.assertTrue('X-Object-Version' in r)
201
        other_version = r['X-Object-Version']
202

    
203
        self.assertTrue(o != other_name)
204

    
205
        r = self.get('%s?version=%s' % (url, other_version))
206
        self.assertEqual(r.status_code, 404)
207

    
208
        r = self.head('%s?version=%s' % (url, other_version))
209
        self.assertEqual(r.status_code, 404)
210

    
211
    def test_objects_with_trailing_spaces(self):
212
        # create object
213
        oname = self.upload_object('c1')[0]
214
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
215

    
216
        r = self.get(quote('%s ' % url))
217
        self.assertEqual(r.status_code, 404)
218

    
219
        # delete object
220
        self.delete(url)
221

    
222
        r = self.get(url)
223
        self.assertEqual(r.status_code, 404)
224

    
225
        # upload object with trailing space
226
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
227

    
228
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
229
        r = self.get(url)
230
        self.assertEqual(r.status_code, 200)
231

    
232
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
233
        r = self.get(url)
234
        self.assertEqual(r.status_code, 404)
235

    
236
    def test_get_partial(self):
237
        cname = self.containers[0]
238
        oname, odata = self.upload_object(cname, length=512)[:-1]
239
        url = join_urls(self.pithos_path, self.user, cname, oname)
240
        r = self.get(url, HTTP_RANGE='bytes=0-499')
241
        self.assertEqual(r.status_code, 206)
242
        data = r.content
243
        self.assertEqual(data, odata[:500])
244
        self.assertTrue('Content-Range' in r)
245
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
246
        self.assertTrue('Content-Type' in r)
247
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
248

    
249
    def test_get_final_500(self):
250
        cname = self.containers[0]
251
        oname, odata = self.upload_object(cname, length=512)[:-1]
252
        size = len(odata)
253
        url = join_urls(self.pithos_path, self.user, cname, oname)
254
        r = self.get(url, HTTP_RANGE='bytes=-500')
255
        self.assertEqual(r.status_code, 206)
256
        self.assertEqual(r.content, odata[-500:])
257
        self.assertTrue('Content-Range' in r)
258
        self.assertEqual(r['Content-Range'],
259
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
260
        self.assertTrue('Content-Type' in r)
261
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
262

    
263
    def test_get_rest(self):
264
        cname = self.containers[0]
265
        oname, odata = self.upload_object(cname, length=512)[:-1]
266
        size = len(odata)
267
        url = join_urls(self.pithos_path, self.user, cname, oname)
268
        offset = len(odata) - random.randint(1, 512)
269
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
270
        self.assertEqual(r.status_code, 206)
271
        self.assertEqual(r.content, odata[offset:])
272
        self.assertTrue('Content-Range' in r)
273
        self.assertEqual(r['Content-Range'],
274
                         'bytes %s-%s/%s' % (offset, size - 1, size))
275
        self.assertTrue('Content-Type' in r)
276
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
277

    
278
    def test_get_range_not_satisfiable(self):
279
        cname = self.containers[0]
280
        oname, odata = self.upload_object(cname, length=512)[:-1]
281
        url = join_urls(self.pithos_path, self.user, cname, oname)
282

    
283
        # TODO
284
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
285
        #self.assertEqual(r.status_code, 416)
286

    
287
        offset = len(odata) + 1
288
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
289
        self.assertEqual(r.status_code, 416)
290

    
291
    def test_multiple_range(self):
292
        cname = self.containers[0]
293
        oname, odata = self.upload_object(cname)[:-1]
294
        url = join_urls(self.pithos_path, self.user, cname, oname)
295

    
296
        l = ['0-499', '-500', '1000-']
297
        ranges = 'bytes=%s' % ','.join(l)
298
        r = self.get(url, HTTP_RANGE=ranges)
299
        self.assertEqual(r.status_code, 206)
300
        self.assertTrue('content-type' in r)
301
        p = re.compile(
302
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
303
            re.I)
304
        m = p.match(r['content-type'])
305
        if m is None:
306
            self.fail('Invalid multiple range content type')
307
        boundary = m.groupdict()['boundary']
308
        cparts = r.content.split('--%s' % boundary)[1:-1]
309

    
310
        # assert content parts length
311
        self.assertEqual(len(cparts), len(l))
312

    
313
        # for each content part assert headers
314
        i = 0
315
        for cpart in cparts:
316
            content = cpart.split('\r\n')
317
            headers = content[1:3]
318
            content_range = headers[0].split(': ')
319
            self.assertEqual(content_range[0], 'Content-Range')
320

    
321
            r = l[i].split('-')
322
            if not r[0] and not r[1]:
323
                pass
324
            elif not r[0]:
325
                start = len(odata) - int(r[1])
326
                end = len(odata)
327
            elif not r[1]:
328
                start = int(r[0])
329
                end = len(odata)
330
            else:
331
                start = int(r[0])
332
                end = int(r[1]) + 1
333
            fdata = odata[start:end]
334
            sdata = '\r\n'.join(content[4:-1])
335
            self.assertEqual(len(fdata), len(sdata))
336
            self.assertEquals(fdata, sdata)
337
            i += 1
338

    
339
    def test_multiple_range_not_satisfiable(self):
340
        # perform get with multiple range
341
        cname = self.containers[0]
342
        oname, odata = self.upload_object(cname)[:-1]
343
        out_of_range = len(odata) + 1
344
        l = ['0-499', '-500', '%d-' % out_of_range]
345
        ranges = 'bytes=%s' % ','.join(l)
346
        url = join_urls(self.pithos_path, self.user, cname, oname)
347
        r = self.get(url, HTTP_RANGE=ranges)
348
        self.assertEqual(r.status_code, 416)
349

    
350
    def test_get_if_match(self):
351
        cname = self.containers[0]
352
        oname, odata = self.upload_object(cname)[:-1]
353

    
354
        # perform get with If-Match
355
        url = join_urls(self.pithos_path, self.user, cname, oname)
356

    
357
        if pithos_settings.UPDATE_MD5:
358
            etag = md5_hash(odata)
359
        else:
360
            etag = merkle(odata)
361

    
362
        r = self.get(url, HTTP_IF_MATCH=etag)
363

    
364
        # assert get success
365
        self.assertEqual(r.status_code, 200)
366

    
367
        # assert response content
368
        self.assertEqual(r.content, odata)
369

    
370
    def test_get_if_match_star(self):
371
        cname = self.containers[0]
372
        oname, odata = self.upload_object(cname)[:-1]
373

    
374
        # perform get with If-Match *
375
        url = join_urls(self.pithos_path, self.user, cname, oname)
376
        r = self.get(url, HTTP_IF_MATCH='*')
377

    
378
        # assert get success
379
        self.assertEqual(r.status_code, 200)
380

    
381
        # assert response content
382
        self.assertEqual(r.content, odata)
383

    
384
    def test_get_multiple_if_match(self):
385
        cname = self.containers[0]
386
        oname, odata = self.upload_object(cname)[:-1]
387

    
388
        # perform get with If-Match
389
        url = join_urls(self.pithos_path, self.user, cname, oname)
390

    
391
        if pithos_settings.UPDATE_MD5:
392
            etag = md5_hash(odata)
393
        else:
394
            etag = merkle(odata)
395

    
396
        quoted = lambda s: '"%s"' % s
397
        r = self.get(url, HTTP_IF_MATCH=','.join(
398
            [quoted(etag), quoted(get_random_data(64))]))
399

    
400
        # assert get success
401
        self.assertEqual(r.status_code, 200)
402

    
403
        # assert response content
404
        self.assertEqual(r.content, odata)
405

    
406
    def test_if_match_precondition_failed(self):
407
        cname = self.containers[0]
408
        oname, odata = self.upload_object(cname)[:-1]
409

    
410
        # perform get with If-Match
411
        url = join_urls(self.pithos_path, self.user, cname, oname)
412
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
413
        self.assertEqual(r.status_code, 412)
414

    
415
    def test_if_none_match(self):
416
        # upload object
417
        cname = self.containers[0]
418
        oname, odata = self.upload_object(cname)[:-1]
419

    
420
        if pithos_settings.UPDATE_MD5:
421
            etag = md5_hash(odata)
422
        else:
423
            etag = merkle(odata)
424

    
425
        # perform get with If-None-Match
426
        url = join_urls(self.pithos_path, self.user, cname, oname)
427
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
428

    
429
        # assert precondition_failed
430
        self.assertEqual(r.status_code, 304)
431

    
432
        # update object data
433
        r = self.append_object_data(cname, oname)[-1]
434
        self.assertTrue(etag != r['ETag'])
435

    
436
        # perform get with If-None-Match
437
        url = join_urls(self.pithos_path, self.user, cname, oname)
438
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
439

    
440
        # assert get success
441
        self.assertEqual(r.status_code, 200)
442

    
443
    def test_if_none_match_star(self):
444
        # upload object
445
        cname = self.containers[0]
446
        oname, odata = self.upload_object(cname)[:-1]
447

    
448
        # perform get with If-None-Match with star
449
        url = join_urls(self.pithos_path, self.user, cname, oname)
450
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
451

    
452
        self.assertEqual(r.status_code, 304)
453

    
454
    def test_if_modified_since(self):
455
        # upload object
456
        cname = self.containers[0]
457
        oname, odata = self.upload_object(cname)[:-1]
458
        object_info = self.get_object_info(cname, oname)
459
        last_modified = object_info['Last-Modified']
460
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
461
        t1_formats = map(t1.strftime, DATE_FORMATS)
462

    
463
        # Check not modified since
464
        url = join_urls(self.pithos_path, self.user, cname, oname)
465
        for t in t1_formats:
466
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
467
            self.assertEqual(r.status_code, 304)
468

    
469
        _time.sleep(1)
470

    
471
        # update object data
472
        appended_data = self.append_object_data(cname, oname)[1]
473

    
474
        # Check modified since
475
        url = join_urls(self.pithos_path, self.user, cname, oname)
476
        for t in t1_formats:
477
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
478
            self.assertEqual(r.status_code, 200)
479
            self.assertEqual(r.content, odata + appended_data)
480

    
481
    def test_if_modified_since_invalid_date(self):
482
        cname = self.containers[0]
483
        oname, odata = self.upload_object(cname)[:-1]
484
        url = join_urls(self.pithos_path, self.user, cname, oname)
485
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
486
        self.assertEqual(r.status_code, 200)
487
        self.assertEqual(r.content, odata)
488

    
489
    def test_if_not_modified_since(self):
490
        cname = self.containers[0]
491
        oname, odata = self.upload_object(cname)[:-1]
492
        url = join_urls(self.pithos_path, self.user, cname, oname)
493
        object_info = self.get_object_info(cname, oname)
494
        last_modified = object_info['Last-Modified']
495
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
496

    
497
        # Check unmodified
498
        t1 = t + datetime.timedelta(seconds=1)
499
        t1_formats = map(t1.strftime, DATE_FORMATS)
500
        for t in t1_formats:
501
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
502
            self.assertEqual(r.status_code, 200)
503
            self.assertEqual(r.content, odata)
504

    
505
        # modify object
506
        _time.sleep(2)
507
        self.append_object_data(cname, oname)
508

    
509
        object_info = self.get_object_info(cname, oname)
510
        last_modified = object_info['Last-Modified']
511
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
512
        t2 = t - datetime.timedelta(seconds=1)
513
        t2_formats = map(t2.strftime, DATE_FORMATS)
514

    
515
        # check modified
516
        for t in t2_formats:
517
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
518
            self.assertEqual(r.status_code, 412)
519

    
520
        # modify account: update object meta
521
        _time.sleep(1)
522
        self.update_object_meta(cname, oname, {'foo': 'bar'})
523

    
524
        object_info = self.get_object_info(cname, oname)
525
        last_modified = object_info['Last-Modified']
526
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
527
        t3 = t - datetime.timedelta(seconds=1)
528
        t3_formats = map(t3.strftime, DATE_FORMATS)
529

    
530
        # check modified
531
        for t in t3_formats:
532
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
533
            self.assertEqual(r.status_code, 412)
534

    
535
    def test_if_unmodified_since(self):
536
        cname = self.containers[0]
537
        oname, odata = self.upload_object(cname)[:-1]
538
        url = join_urls(self.pithos_path, self.user, cname, oname)
539
        object_info = self.get_object_info(cname, oname)
540
        last_modified = object_info['Last-Modified']
541
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
542
        t = t + datetime.timedelta(seconds=1)
543
        t_formats = map(t.strftime, DATE_FORMATS)
544

    
545
        for tf in t_formats:
546
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
547
            self.assertEqual(r.status_code, 200)
548
            self.assertEqual(r.content, odata)
549

    
550
    def test_if_unmodified_since_precondition_failed(self):
551
        cname = self.containers[0]
552
        oname, odata = self.upload_object(cname)[:-1]
553
        url = join_urls(self.pithos_path, self.user, cname, oname)
554
        object_info = self.get_object_info(cname, oname)
555
        last_modified = object_info['Last-Modified']
556
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
557
        t = t - datetime.timedelta(seconds=1)
558
        t_formats = map(t.strftime, DATE_FORMATS)
559

    
560
        for tf in t_formats:
561
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
562
            self.assertEqual(r.status_code, 412)
563

    
564
    def test_hashes(self):
565
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
566
        cname = self.containers[0]
567
        oname, odata = self.upload_object(cname, length=l)[:-1]
568
        size = len(odata)
569

    
570
        url = join_urls(self.pithos_path, self.user, cname, oname)
571
        r = self.get('%s?format=json&hashmap' % url)
572
        self.assertEqual(r.status_code, 200)
573
        body = json.loads(r.content)
574

    
575
        hashes = body['hashes']
576
        block_size = body['block_size']
577
        block_num = size / block_size if size / block_size == 0 else\
578
            size / block_size + 1
579
        self.assertTrue(len(hashes), block_num)
580
        i = 0
581
        for h in hashes:
582
            start = i * block_size
583
            end = (i + 1) * block_size
584
            hash = merkle(odata[start:end])
585
            self.assertEqual(h, hash)
586
            i += 1
587

    
588

    
589
class ObjectPut(PithosAPITest):
590
    def setUp(self):
591
        PithosAPITest.setUp(self)
592
        self.container = get_random_name()
593
        self.create_container(self.container)
594

    
595
    def test_upload(self):
596
        cname = self.container
597
        oname = get_random_name()
598
        data = get_random_data()
599
        meta = {'test': 'test1'}
600
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
601
                       for k, v in meta.iteritems())
602
        headers['HTTP_CONTENT_DISPOSITION'] = 'attachment; filename="%f2"'
603
        url = join_urls(self.pithos_path, self.user, cname, oname)
604
        r = self.put(url, data=data, content_type='application/pdf',
605
                     quote_extra=False, **headers)
606
        self.assertEqual(r.status_code, 400)
607

    
608
        headers['HTTP_CONTENT_DISPOSITION'] = ('attachment; filename="%s"' %
609
                                               oname)
610
        url = join_urls(self.pithos_path, self.user, cname, oname)
611
        r = self.put(url, data=data, content_type='㌀', **headers)
612
        self.assertEqual(r.status_code, 400)
613

    
614
        r = self.put(url, data=data, content_type='application/pdf', **headers)
615
        self.assertEqual(r.status_code, 201)
616
        self.assertTrue('ETag' in r)
617
        self.assertTrue('X-Object-Version' in r)
618

    
619
        info = self.get_object_info(cname, oname)
620

    
621
        # assert object meta
622
        self.assertTrue('X-Object-Meta-Test' in info)
623
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
624

    
625
        # assert content-type
626
        self.assertEqual(info['content-type'], 'application/pdf')
627

    
628
        # assert uploaded content
629
        r = self.get(url)
630
        self.assertEqual(r.status_code, 200)
631
        self.assertEqual(r.content, data)
632

    
633
    def test_maximum_upload_size_exceeds(self):
634
        cname = self.container
635
        oname = get_random_name()
636

    
637
        # set container quota to 100
638
        url = join_urls(self.pithos_path, self.user, cname)
639
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
640
        self.assertEqual(r.status_code, 202)
641

    
642
        info = self.get_container_info(cname)
643
        length = int(info['X-Container-Policy-Quota']) + 1
644

    
645
        data = get_random_data(length)
646
        url = join_urls(self.pithos_path, self.user, cname, oname)
647
        r = self.put(url, data=data)
648
        self.assertEqual(r.status_code, 413)
649

    
650
    def test_upload_with_name_containing_slash(self):
651
        cname = self.container
652
        oname = '/%s' % get_random_name()
653
        data = get_random_data()
654
        url = join_urls(self.pithos_path, self.user, cname, oname)
655
        r = self.put(url, data=data)
656
        self.assertEqual(r.status_code, 201)
657
        self.assertTrue('ETag' in r)
658
        self.assertTrue('X-Object-Version' in r)
659

    
660
        r = self.get(url)
661
        self.assertEqual(r.status_code, 200)
662
        self.assertEqual(r.content, data)
663

    
664
    def test_upload_unprocessable_entity(self):
665
        cname = self.container
666
        oname = get_random_name()
667
        data = get_random_data()
668
        url = join_urls(self.pithos_path, self.user, cname, oname)
669
        r = self.put(url, data=data, HTTP_ETAG='123')
670
        self.assertEqual(r.status_code, 422)
671

    
672
#    def test_chunked_transfer(self):
673
#        cname = self.container
674
#        oname = '/%s' % get_random_name()
675
#        data = get_random_data()
676
#        url = join_urls(self.pithos_path, self.user, cname, oname)
677
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
678
#        self.assertEqual(r.status_code, 201)
679
#        self.assertTrue('ETag' in r)
680
#        self.assertTrue('X-Object-Version' in r)
681

    
682
    def test_manifestation(self):
683
        cname = self.container
684
        prefix = 'myobject/'
685
        data = ''
686
        for i in range(random.randint(2, 10)):
687
            part = '%s%d' % (prefix, i)
688
            data += self.upload_object(cname, oname=part)[1]
689

    
690
        manifest = '%s/%s' % (cname, prefix)
691
        oname = get_random_name()
692
        url = join_urls(self.pithos_path, self.user, cname, oname)
693
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
694
        self.assertEqual(r.status_code, 201)
695

    
696
        # assert object exists
697
        r = self.get(url)
698
        self.assertEqual(r.status_code, 200)
699

    
700
        # assert its content
701
        self.assertEqual(r.content, data)
702

    
703
        # invalid manifestation
704
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
705
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
706
        r = self.get(url)
707
        self.assertEqual(r.content, '')
708

    
709
    def test_create_zero_length_object(self):
710
        cname = self.container
711
        oname = get_random_name()
712
        url = join_urls(self.pithos_path, self.user, cname, oname)
713
        r = self.put(url, data='')
714
        self.assertEqual(r.status_code, 201)
715

    
716
        r = self.get(url)
717
        self.assertEqual(r.status_code, 200)
718
        self.assertEqual(int(r['Content-Length']), 0)
719
        self.assertEqual(r.content, '')
720

    
721
        r = self.get('%s?hashmap=&format=json' % url)
722
        self.assertEqual(r.status_code, 200)
723
        body = json.loads(r.content)
724
        hashes = body['hashes']
725
        hash = merkle('')
726
        self.assertEqual(hashes, [hash])
727

    
728
    def test_create_object_by_hashmap(self):
729
        cname = self.container
730
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
731

    
732
        # upload an object
733
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
734
        # get it hashmap
735
        url = join_urls(self.pithos_path, self.user, cname, oname)
736
        r = self.get('%s?hashmap=&format=json' % url)
737

    
738
        oname = get_random_name()
739
        url = join_urls(self.pithos_path, self.user, cname, oname)
740
        r = self.put('%s?hashmap=' % url, data=r.content)
741
        self.assertEqual(r.status_code, 201)
742

    
743
        r = self.get(url)
744
        self.assertEqual(r.status_code, 200)
745
        self.assertEqual(r.content, data)
746

    
747
    def test_create_object_by_invalid_hashmap(self):
748
        cname = self.container
749
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
750

    
751
        # upload an object
752
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
753
        # get it hashmap
754
        url = join_urls(self.pithos_path, self.user, cname, oname)
755
        r = self.get('%s?hashmap=&format=json' % url)
756
        data = r.content
757
        try:
758
            hashmap = json.loads(data)
759
        except:
760
            self.fail('JSON format expected')
761

    
762
        oname = get_random_name()
763
        url = join_urls(self.pithos_path, self.user, cname, oname)
764
        hashmap['hashes'] = [get_random_name()]
765
        r = self.put('%s?hashmap=' % url, data=json.dumps(hashmap))
766
        self.assertEqual(r.status_code, 400)
767

    
768

    
769
class ObjectPutCopy(PithosAPITest):
770
    def setUp(self):
771
        PithosAPITest.setUp(self)
772
        self.container = 'c1'
773
        self.create_container(self.container)
774
        self.object, self.data = self.upload_object(self.container)[:-1]
775

    
776
        url = join_urls(
777
            self.pithos_path, self.user, self.container, self.object)
778
        r = self.head(url)
779
        self.etag = r['X-Object-Hash']
780

    
781
    def test_copy(self):
782
        with AssertMappingInvariant(self.get_object_info, self.container,
783
                                    self.object):
784
            # copy object
785
            oname = get_random_name()
786
            url = join_urls(self.pithos_path, self.user, self.container, oname)
787
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
788
                         HTTP_X_COPY_FROM='/%s/%s' % (
789
                             self.container, self.object))
790

    
791
            # assert copy success
792
            self.assertEqual(r.status_code, 201)
793

    
794
            # assert access the new object
795
            r = self.head(url)
796
            self.assertEqual(r.status_code, 200)
797
            self.assertTrue('X-Object-Meta-Test' in r)
798
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
799

    
800
            # assert etag is the same
801
            self.assertTrue('X-Object-Hash' in r)
802
            self.assertEqual(r['X-Object-Hash'], self.etag)
803

    
804
    def test_copy_from_different_container(self):
805
        cname = 'c2'
806
        self.create_container(cname)
807
        with AssertMappingInvariant(self.get_object_info, self.container,
808
                                    self.object):
809
            oname = get_random_name()
810
            url = join_urls(self.pithos_path, self.user, cname, oname)
811
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
812
                         HTTP_X_COPY_FROM='/%s/%s' % (
813
                             self.container, self.object))
814

    
815
            # assert copy success
816
            self.assertEqual(r.status_code, 201)
817

    
818
            # assert access the new object
819
            r = self.head(url)
820
            self.assertEqual(r.status_code, 200)
821
            self.assertTrue('X-Object-Meta-Test' in r)
822
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
823

    
824
            # assert etag is the same
825
            self.assertTrue('X-Object-Hash' in r)
826
            self.assertEqual(r['X-Object-Hash'], self.etag)
827

    
828
    def test_copy_from_other_account(self):
829
        cname = 'c2'
830
        self.create_container(cname, user='chuck')
831
        self.create_container(cname, user='alice')
832

    
833
        # share object for read with alice
834
        url = join_urls(self.pithos_path, self.user, self.container,
835
                        self.object)
836
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
837
                      HTTP_X_OBJECT_SHARING='read=alice')
838
        self.assertEqual(r.status_code, 202)
839

    
840
        # assert not allowed for chuck
841
        oname = get_random_name()
842
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
843
        r = self.put(url, data='', user='chuck',
844
                     HTTP_X_OBJECT_META_TEST='testcopy',
845
                     HTTP_X_COPY_FROM='/%s/%s' % (
846
                         self.container, self.object),
847
                     HTTP_X_SOURCE_ACCOUNT='user')
848

    
849
        self.assertEqual(r.status_code, 403)
850

    
851
        # assert copy success for alice
852
        url = join_urls(self.pithos_path, 'alice', cname, oname)
853
        r = self.put(url, data='', user='alice',
854
                     HTTP_X_OBJECT_META_TEST='testcopy',
855
                     HTTP_X_COPY_FROM='/%s/%s' % (
856
                         self.container, self.object),
857
                     HTTP_X_SOURCE_ACCOUNT='user')
858
        self.assertEqual(r.status_code, 201)
859

    
860
        # assert access the new object
861
        r = self.head(url, user='alice')
862
        self.assertEqual(r.status_code, 200)
863
        self.assertTrue('X-Object-Meta-Test' in r)
864
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
865

    
866
        # assert etag is the same
867
        self.assertTrue('X-Object-Hash' in r)
868
        self.assertEqual(r['X-Object-Hash'], self.etag)
869

    
870
        # share object for write
871
        url = join_urls(self.pithos_path, self.user, self.container,
872
                        self.object)
873
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
874
                      HTTP_X_OBJECT_SHARING='write=dan')
875
        self.assertEqual(r.status_code, 202)
876

    
877
        # assert not allowed copy for alice
878
        url = join_urls(self.pithos_path, 'alice', cname, oname)
879
        r = self.put(url, data='', user='alice',
880
                     HTTP_X_OBJECT_META_TEST='testcopy',
881
                     HTTP_X_COPY_FROM='/%s/%s' % (
882
                         self.container, self.object),
883
                     HTTP_X_SOURCE_ACCOUNT='user')
884
        self.assertEqual(r.status_code, 403)
885

    
886
        # assert allowed copy for dan
887
        self.create_container(cname, user='dan')
888
        url = join_urls(self.pithos_path, 'dan', cname, oname)
889
        r = self.put(url, data='', user='dan',
890
                     HTTP_X_OBJECT_META_TEST='testcopy',
891
                     HTTP_X_COPY_FROM='/%s/%s' % (
892
                         self.container, self.object),
893
                     HTTP_X_SOURCE_ACCOUNT='user')
894
        self.assertEqual(r.status_code, 201)
895

    
896
        # assert access the new object
897
        r = self.head(url, user='dan')
898
        self.assertEqual(r.status_code, 200)
899
        self.assertTrue('X-Object-Meta-Test' in r)
900
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
901

    
902
        # assert etag is the same
903
        self.assertTrue('X-Object-Hash' in r)
904
        self.assertEqual(r['X-Object-Hash'], self.etag)
905

    
906
        # assert source object still exists
907
        url = join_urls(self.pithos_path, self.user, self.container,
908
                        self.object)
909
        r = self.head(url)
910
        self.assertEqual(r.status_code, 200)
911
        # assert etag is the same
912
        self.assertTrue('X-Object-Hash' in r)
913
        self.assertEqual(r['X-Object-Hash'], self.etag)
914

    
915
        r = self.get(url)
916
        self.assertEqual(r.status_code, 200)
917
        # assert etag is the same
918
        self.assertTrue('X-Object-Hash' in r)
919
        self.assertEqual(r['X-Object-Hash'], self.etag)
920

    
921
    def test_copy_invalid(self):
922
        # copy from non-existent object
923
        oname = get_random_name()
924
        url = join_urls(self.pithos_path, self.user, self.container, oname)
925
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
926
                     HTTP_X_COPY_FROM='/%s/%s' % (
927
                         self.container, get_random_name()))
928
        self.assertEqual(r.status_code, 404)
929

    
930
        # copy from non-existent container
931
        oname = get_random_name()
932
        url = join_urls(self.pithos_path, self.user, self.container, oname)
933
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
934
                     HTTP_X_COPY_FROM='/%s/%s' % (
935
                         get_random_name(), self.object))
936
        self.assertEqual(r.status_code, 404)
937

    
938
    @pithos_test_settings(API_LIST_LIMIT=10)
939
    def test_copy_dir(self):
940
        folder = self.create_folder(self.container)[0]
941
        subfolder = self.create_folder(
942
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
943
        objects = [subfolder]
944
        append = objects.append
945
        for i in range(11):
946
            append(self.upload_object(self.container,
947
                                      '%s/%d' % (folder, i),
948
                                      depth='1')[0])
949
        other = self.upload_object(self.container, strnextling(folder))[0]
950

    
951
        # copy dir
952
        copy_folder = self.create_folder(self.container)[0]
953
        url = join_urls(self.pithos_path, self.user, self.container,
954
                        copy_folder)
955
        r = self.put('%s?delimiter=/' % url, data='',
956
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
957
        self.assertEqual(r.status_code, 201)
958

    
959
        for obj in objects:
960
            # assert object exists
961
            url = join_urls(self.pithos_path, self.user, self.container,
962
                            obj.replace(folder, copy_folder))
963
            r = self.head(url)
964
            self.assertEqual(r.status_code, 200)
965

    
966
            # assert metadata
967
            meta = self.get_object_meta(self.container, obj)
968
            for k in meta.keys():
969
                key = 'X-Object-Meta-%s' % k
970
                self.assertTrue(key in r)
971
                self.assertEqual(r[key], meta[k])
972

    
973
        # assert other has not been created under copy folder
974
        url = join_urls(self.pithos_path, self.user, self.container,
975
                        '%s/%s' % (copy_folder,
976
                                   other.replace(folder, copy_folder)))
977
        r = self.head(url)
978
        self.assertEqual(r.status_code, 404)
979

    
980

    
981
class ObjectPutMove(PithosAPITest):
982
    def setUp(self):
983
        PithosAPITest.setUp(self)
984
        self.container = 'c1'
985
        self.create_container(self.container)
986
        self.object, self.data = self.upload_object(self.container)[:-1]
987

    
988
        url = join_urls(
989
            self.pithos_path, self.user, self.container, self.object)
990
        r = self.head(url)
991
        self.etag = r['X-Object-Hash']
992

    
993
    def test_move(self):
994
        # move object
995
        oname = get_random_name()
996
        url = join_urls(self.pithos_path, self.user, self.container, oname)
997
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
998
                     HTTP_X_MOVE_FROM='/%s/%s' % (
999
                         self.container, self.object))
1000

    
1001
        # assert move success
1002
        self.assertEqual(r.status_code, 201)
1003

    
1004
        # assert access the new object
1005
        r = self.head(url)
1006
        self.assertEqual(r.status_code, 200)
1007
        self.assertTrue('X-Object-Meta-Test' in r)
1008
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1009

    
1010
        # assert etag is the same
1011
        self.assertTrue('X-Object-Hash' in r)
1012

    
1013
        # assert the initial object has been deleted
1014
        url = join_urls(self.pithos_path, self.user, self.container,
1015
                        self.object)
1016
        r = self.head(url)
1017
        self.assertEqual(r.status_code, 404)
1018

    
1019
    @pithos_test_settings(API_LIST_LIMIT=10)
1020
    def test_move_dir(self):
1021
        folder = self.create_folder(self.container)[0]
1022
        subfolder = self.create_folder(
1023
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1024
        objects = [subfolder]
1025
        append = objects.append
1026
        for i in range(11):
1027
            append(self.upload_object(self.container,
1028
                                      '%s/%d' % (folder, i),
1029
                                      depth='1')[0])
1030
        other = self.upload_object(self.container, strnextling(folder))[0]
1031

    
1032
        # move dir
1033
        copy_folder = self.create_folder(self.container)[0]
1034
        url = join_urls(self.pithos_path, self.user, self.container,
1035
                        copy_folder)
1036
        r = self.put('%s?delimiter=/' % url, data='',
1037
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
1038
        self.assertEqual(r.status_code, 201)
1039

    
1040
        for obj in objects:
1041
            # assert initial object does not exist
1042
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1043
            r = self.head(url)
1044
            self.assertEqual(r.status_code, 404)
1045

    
1046
            # assert new object was created
1047
            url = join_urls(self.pithos_path, self.user, self.container,
1048
                            obj.replace(folder, copy_folder))
1049
            r = self.head(url)
1050
            self.assertEqual(r.status_code, 200)
1051

    
1052
        # assert other has not been created under copy folder
1053
        url = join_urls(self.pithos_path, self.user, self.container,
1054
                        '%s/%s' % (copy_folder,
1055
                                   other.replace(folder, copy_folder)))
1056
        r = self.head(url)
1057
        self.assertEqual(r.status_code, 404)
1058

    
1059
    def test_move_from_other_account(self):
1060
        cname = 'c2'
1061
        self.create_container(cname, user='chuck')
1062
        self.create_container(cname, user='alice')
1063

    
1064
        # share object for read with alice
1065
        url = join_urls(self.pithos_path, self.user, self.container,
1066
                        self.object)
1067
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1068
                      HTTP_X_OBJECT_SHARING='read=alice')
1069
        self.assertEqual(r.status_code, 202)
1070

    
1071
        # assert not allowed move for chuck
1072
        oname = get_random_name()
1073
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
1074
        r = self.put(url, data='', user='chuck',
1075
                     HTTP_X_OBJECT_META_TEST='testcopy',
1076
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1077
                         self.container, self.object),
1078
                     HTTP_X_SOURCE_ACCOUNT='user')
1079

    
1080
        self.assertEqual(r.status_code, 403)
1081

    
1082
        # assert no new object was created
1083
        r = self.head(url, user='chuck')
1084
        self.assertEqual(r.status_code, 404)
1085

    
1086
        # assert not allowed move for alice
1087
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1088
        r = self.put(url, data='', user='alice',
1089
                     HTTP_X_OBJECT_META_TEST='testcopy',
1090
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1091
                         self.container, self.object),
1092
                     HTTP_X_SOURCE_ACCOUNT='user')
1093
        self.assertEqual(r.status_code, 403)
1094

    
1095
        # assert no new object was created
1096
        r = self.head(url, user='alice')
1097
        self.assertEqual(r.status_code, 404)
1098

    
1099
        # share object for write with dan
1100
        url = join_urls(self.pithos_path, self.user, self.container,
1101
                        self.object)
1102
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1103
                      HTTP_X_OBJECT_SHARING='write=dan')
1104
        self.assertEqual(r.status_code, 202)
1105

    
1106
        # assert not allowed move for alice
1107
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1108
        r = self.put(url, data='', user='alice',
1109
                     HTTP_X_OBJECT_META_TEST='testcopy',
1110
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1111
                         self.container, self.object),
1112
                     HTTP_X_SOURCE_ACCOUNT='user')
1113
        self.assertEqual(r.status_code, 403)
1114

    
1115
        # assert no new object was created
1116
        r = self.head(url, user='alice')
1117
        self.assertEqual(r.status_code, 404)
1118

    
1119
        # assert not allowed move for dan
1120
        self.create_container(cname, user='dan')
1121
        url = join_urls(self.pithos_path, 'dan', cname, oname)
1122
        r = self.put(url, data='', user='dan',
1123
                     HTTP_X_OBJECT_META_TEST='testcopy',
1124
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1125
                         self.container, self.object),
1126
                     HTTP_X_SOURCE_ACCOUNT='user')
1127
        self.assertEqual(r.status_code, 403)
1128

    
1129
        # assert no new object was created
1130
        r = self.head(url, user='dan')
1131
        self.assertEqual(r.status_code, 404)
1132

    
1133

    
1134
class ObjectCopy(PithosAPITest):
1135
    def setUp(self):
1136
        PithosAPITest.setUp(self)
1137
        self.container = 'c1'
1138
        self.create_container(self.container)
1139
        self.object, self.data = self.upload_object(self.container)[:-1]
1140

    
1141
        url = join_urls(
1142
            self.pithos_path, self.user, self.container, self.object)
1143
        r = self.head(url)
1144
        self.etag = r['X-Object-Hash']
1145

    
1146
    def test_copy(self):
1147
        with AssertMappingInvariant(self.get_object_info, self.container,
1148
                                    self.object):
1149
            oname = get_random_name()
1150
            # copy object
1151
            url = join_urls(self.pithos_path, self.user, self.container,
1152
                            self.object)
1153
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1154
                          HTTP_DESTINATION='/%s/%s' % (self.container,
1155
                                                       oname))
1156
            # assert copy success
1157
            url = join_urls(self.pithos_path, self.user, self.container,
1158
                            oname)
1159
            self.assertEqual(r.status_code, 201)
1160

    
1161
            # assert access the new object
1162
            r = self.head(url)
1163
            self.assertEqual(r.status_code, 200)
1164
            self.assertTrue('X-Object-Meta-Test' in r)
1165
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1166

    
1167
            # assert etag is the same
1168
            self.assertTrue('X-Object-Hash' in r)
1169
            self.assertEqual(r['X-Object-Hash'], self.etag)
1170

    
1171
            # assert source object still exists
1172
            url = join_urls(self.pithos_path, self.user, self.container,
1173
                            self.object)
1174
            r = self.head(url)
1175
            self.assertEqual(r.status_code, 200)
1176

    
1177
            # assert etag is the same
1178
            self.assertTrue('X-Object-Hash' in r)
1179
            self.assertEqual(r['X-Object-Hash'], self.etag)
1180

    
1181
            r = self.get(url)
1182
            self.assertEqual(r.status_code, 200)
1183

    
1184
            # assert etag is the same
1185
            self.assertTrue('X-Object-Hash' in r)
1186
            self.assertEqual(r['X-Object-Hash'], self.etag)
1187

    
1188
            # copy object to other container (not existing)
1189
            cname = get_random_name()
1190
            url = join_urls(self.pithos_path, self.user, self.container,
1191
                            self.object)
1192
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1193
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1194

    
1195
            # assert destination container does not exist
1196
            url = join_urls(self.pithos_path, self.user, cname,
1197
                            self.object)
1198
            self.assertEqual(r.status_code, 404)
1199

    
1200
            # create container
1201
            self.create_container(cname)
1202

    
1203
            # copy object to other container (existing)
1204
            url = join_urls(self.pithos_path, self.user, self.container,
1205
                            self.object)
1206
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1207
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1208

    
1209
            # assert copy success
1210
            url = join_urls(self.pithos_path, self.user, cname,
1211
                            self.object)
1212
            self.assertEqual(r.status_code, 201)
1213

    
1214
            # assert access the new object
1215
            r = self.head(url)
1216
            self.assertEqual(r.status_code, 200)
1217
            self.assertTrue('X-Object-Meta-Test' in r)
1218
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1219

    
1220
            # assert etag is the same
1221
            self.assertTrue('X-Object-Hash' in r)
1222
            self.assertEqual(r['X-Object-Hash'], self.etag)
1223

    
1224
            # assert source object still exists
1225
            url = join_urls(self.pithos_path, self.user, self.container,
1226
                            self.object)
1227
            r = self.head(url)
1228
            self.assertEqual(r.status_code, 200)
1229

    
1230
            # assert etag is the same
1231
            self.assertTrue('X-Object-Hash' in r)
1232
            self.assertEqual(r['X-Object-Hash'], self.etag)
1233

    
1234
            r = self.get(url)
1235
            self.assertEqual(r.status_code, 200)
1236

    
1237
            # assert etag is the same
1238
            self.assertTrue('X-Object-Hash' in r)
1239
            self.assertEqual(r['X-Object-Hash'], self.etag)
1240

    
1241
    @pithos_test_settings(API_LIST_LIMIT=10)
1242
    def test_copy_dir_contents(self):
1243
        folder = self.create_folder(self.container)[0]
1244
        subfolder = self.create_folder(
1245
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1246
        objects = [subfolder]
1247
        append = objects.append
1248
        for i in range(11):
1249
            append(self.upload_object(self.container,
1250
                                      '%s/%d' % (folder, i),
1251
                                      depth='1')[0])
1252
        other = self.upload_object(self.container, strnextling(folder))[0]
1253

    
1254
        # copy dir
1255
        url = join_urls(self.pithos_path, self.user, self.container,
1256
                        folder)
1257
        copy_folder = self.create_folder(self.container)[0]
1258
        r = self.copy('%s?delimiter=/' % url,
1259
                      HTTP_X_OBJECT_META_TEST='testcopy',
1260
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1261
                                                   copy_folder))
1262
        self.assertEqual(r.status_code, 201)
1263

    
1264
        for obj in objects:
1265
            # assert object exists
1266
            url = join_urls(self.pithos_path, self.user, self.container,
1267
                            obj.replace(folder, copy_folder))
1268
            r = self.head(url)
1269
            self.assertEqual(r.status_code, 200)
1270

    
1271
        # assert other has not been created under copy folder
1272
        url = join_urls(self.pithos_path, self.user, self.container,
1273
                        '%s/%s' % (copy_folder,
1274
                                   other.replace(folder, copy_folder)))
1275
        r = self.head(url)
1276
        self.assertEqual(r.status_code, 404)
1277

    
1278
    def test_copy_to_other_account(self):
1279
        # create a container under alice account
1280
        cname = self.create_container(user='alice')[0]
1281

    
1282
        # create a folder under this container
1283
        folder = self.create_folder(cname, user='alice')[0]
1284

    
1285
        oname = get_random_name()
1286

    
1287
        # copy object to other account container
1288
        url = join_urls(self.pithos_path, self.user, self.container,
1289
                        self.object)
1290
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1291
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1292
                      HTTP_DESTINATION_ACCOUNT='alice')
1293
        self.assertEqual(r.status_code, 403)
1294

    
1295
        # share object for read with user
1296
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1297
        r = self.post(url, user='alice', content_type='',
1298
                      HTTP_CONTENT_RANGE='bytes */*',
1299
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1300
        self.assertEqual(r.status_code, 202)
1301

    
1302
        # assert copy object still is not allowed
1303
        url = join_urls(self.pithos_path, self.user, self.container,
1304
                        self.object)
1305
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1306
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1307
                      HTTP_DESTINATION_ACCOUNT='alice')
1308
        self.assertEqual(r.status_code, 403)
1309

    
1310
        # share object for write with user
1311
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1312
        r = self.post(url, user='alice',  content_type='',
1313
                      HTTP_CONTENT_RANGE='bytes */*',
1314
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1315
        self.assertEqual(r.status_code, 202)
1316

    
1317
        # assert copy object now is allowed
1318
        url = join_urls(self.pithos_path, self.user, self.container,
1319
                        self.object)
1320
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1321
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1322
                      HTTP_DESTINATION_ACCOUNT='alice')
1323
        self.assertEqual(r.status_code, 201)
1324

    
1325
        # assert access the new object
1326
        url = join_urls(self.pithos_path, 'alice', cname, folder, oname)
1327
        r = self.head(url, user='alice')
1328
        self.assertEqual(r.status_code, 200)
1329
        self.assertTrue('X-Object-Meta-Test' in r)
1330
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1331

    
1332
        # assert etag is the same
1333
        self.assertTrue('X-Object-Hash' in r)
1334
        self.assertEqual(r['X-Object-Hash'], self.etag)
1335

    
1336
        # assert source object still exists
1337
        url = join_urls(self.pithos_path, self.user, self.container,
1338
                        self.object)
1339
        r = self.head(url)
1340
        self.assertEqual(r.status_code, 200)
1341

    
1342
        # assert etag is the same
1343
        self.assertTrue('X-Object-Hash' in r)
1344
        self.assertEqual(r['X-Object-Hash'], self.etag)
1345

    
1346
        r = self.get(url)
1347
        self.assertEqual(r.status_code, 200)
1348

    
1349
        # assert etag is the same
1350
        self.assertTrue('X-Object-Hash' in r)
1351
        self.assertEqual(r['X-Object-Hash'], self.etag)
1352

    
1353

    
1354
class ObjectMove(PithosAPITest):
1355
    def setUp(self):
1356
        PithosAPITest.setUp(self)
1357
        self.container = 'c1'
1358
        self.create_container(self.container)
1359
        self.object, self.data = self.upload_object(self.container)[:-1]
1360

    
1361
        url = join_urls(
1362
            self.pithos_path, self.user, self.container, self.object)
1363
        r = self.head(url)
1364
        self.etag = r['X-Object-Hash']
1365

    
1366
    def test_move(self):
1367
        oname = get_random_name()
1368

    
1369
        # move object
1370
        url = join_urls(self.pithos_path, self.user, self.container,
1371
                        self.object)
1372
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1373
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1374
                                                   oname))
1375
        # assert move success
1376
        url = join_urls(self.pithos_path, self.user, self.container,
1377
                        oname)
1378
        self.assertEqual(r.status_code, 201)
1379

    
1380
        # assert access the new object
1381
        r = self.head(url)
1382
        self.assertEqual(r.status_code, 200)
1383
        self.assertTrue('X-Object-Meta-Test' in r)
1384
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1385

    
1386
        # assert etag is the same
1387
        self.assertTrue('X-Object-Hash' in r)
1388
        self.assertEqual(r['X-Object-Hash'], self.etag)
1389

    
1390
        # assert source object does not exist
1391
        url = join_urls(self.pithos_path, self.user, self.container,
1392
                        self.object)
1393
        r = self.head(url)
1394
        self.assertEqual(r.status_code, 404)
1395

    
1396
    @pithos_test_settings(API_LIST_LIMIT=10)
1397
    def test_move_dir_contents(self):
1398
        folder = self.create_folder(self.container)[0]
1399
        subfolder = self.create_folder(
1400
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1401
        objects = [subfolder]
1402
        append = objects.append
1403
        for i in range(11):
1404
            append(self.upload_object(self.container,
1405
                                      '%s/%d' % (folder, i),
1406
                                      depth='1')[0])
1407
        other = self.upload_object(self.container, strnextling(folder))[0]
1408

    
1409
        # copy dir
1410
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1411
        copy_folder = self.create_folder(self.container)[0]
1412
        r = self.move('%s?delimiter=/' % url,
1413
                      HTTP_X_OBJECT_META_TEST='testcopy',
1414
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1415
                                                   copy_folder))
1416
        self.assertEqual(r.status_code, 201)
1417

    
1418
        for obj in objects:
1419
            # assert object exists
1420
            url = join_urls(self.pithos_path, self.user, self.container,
1421
                            obj.replace(folder, copy_folder))
1422
            r = self.head(url)
1423
            self.assertEqual(r.status_code, 200)
1424

    
1425
        # assert other has not been created under copy folder
1426
        url = join_urls(self.pithos_path, self.user, self.container,
1427
                        '%s/%s' % (copy_folder,
1428
                                   other.replace(folder, copy_folder)))
1429
        r = self.head(url)
1430
        self.assertEqual(r.status_code, 404)
1431

    
1432
    def test_move_to_other_container(self):
1433
        # move object to other container (not existing)
1434
        cname = get_random_name()
1435
        url = join_urls(self.pithos_path, self.user, self.container,
1436
                        self.object)
1437
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1438
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1439

    
1440
        # assert destination container does not exist
1441
        url = join_urls(self.pithos_path, self.user, cname,
1442
                        self.object)
1443
        self.assertEqual(r.status_code, 404)
1444

    
1445
        # create container
1446
        self.create_container(cname)
1447

    
1448
        # move object to other container (existing)
1449
        url = join_urls(self.pithos_path, self.user, self.container,
1450
                        self.object)
1451
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1452
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1453

    
1454
        # assert move success
1455
        url = join_urls(self.pithos_path, self.user, cname,
1456
                        self.object)
1457
        self.assertEqual(r.status_code, 201)
1458

    
1459
        # assert access the new object
1460
        r = self.head(url)
1461
        self.assertEqual(r.status_code, 200)
1462
        self.assertTrue('X-Object-Meta-Test' in r)
1463
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1464

    
1465
        # assert etag is the same
1466
        self.assertTrue('X-Object-Hash' in r)
1467
        self.assertEqual(r['X-Object-Hash'], self.etag)
1468

    
1469
        # assert source object does not exist
1470
        url = join_urls(self.pithos_path, self.user, self.container,
1471
                        self.object)
1472
        r = self.head(url)
1473
        self.assertEqual(r.status_code, 404)
1474

    
1475
    def test_move_to_other_account(self):
1476
        # create a container under alice account
1477
        cname = self.create_container(user='alice')[0]
1478

    
1479
        # create a folder under this container
1480
        folder = self.create_folder(cname, user='alice')[0]
1481

    
1482
        oname = get_random_name()
1483

    
1484
        # move object to other account container
1485
        url = join_urls(self.pithos_path, self.user, self.container,
1486
                        self.object)
1487
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1488
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1489
                      HTTP_DESTINATION_ACCOUNT='alice')
1490
        self.assertEqual(r.status_code, 403)
1491

    
1492
        # share object for read with user
1493
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1494
        r = self.post(url, user='alice', content_type='',
1495
                      HTTP_CONTENT_RANGE='bytes */*',
1496
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1497
        self.assertEqual(r.status_code, 202)
1498

    
1499
        # assert move object still is not allowed
1500
        url = join_urls(self.pithos_path, self.user, self.container,
1501
                        self.object)
1502
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1503
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1504
                      HTTP_DESTINATION_ACCOUNT='alice')
1505
        self.assertEqual(r.status_code, 403)
1506

    
1507
        # share object for write with user
1508
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1509
        r = self.post(url, user='alice',  content_type='',
1510
                      HTTP_CONTENT_RANGE='bytes */*',
1511
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1512
        self.assertEqual(r.status_code, 202)
1513

    
1514
        # assert move object now is allowed
1515
        url = join_urls(self.pithos_path, self.user, self.container,
1516
                        self.object)
1517
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1518
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1519
                      HTTP_DESTINATION_ACCOUNT='alice')
1520
        self.assertEqual(r.status_code, 201)
1521

    
1522
        # assert source object does not exist
1523
        url = join_urls(self.pithos_path, self.user, self.container,
1524
                        self.object)
1525
        r = self.head(url)
1526
        self.assertEqual(r.status_code, 404)
1527

    
1528

    
1529
class ObjectPost(PithosAPITest):
1530
    def setUp(self):
1531
        PithosAPITest.setUp(self)
1532
        self.container = 'c1'
1533
        self.create_container(self.container)
1534
        self.object, self.object_data = self.upload_object(self.container)[:2]
1535

    
1536
    def test_update_meta(self):
1537
        with AssertUUidInvariant(self.get_object_info,
1538
                                 self.container,
1539
                                 self.object):
1540
            # update metadata
1541
            d = {'a' * 114: 'b' * 256}
1542
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1543
                          k, v in d.items())
1544
            url = join_urls(self.pithos_path, self.user, self.container,
1545
                            self.object)
1546
            r = self.post(url, content_type='', **kwargs)
1547
            self.assertEqual(r.status_code, 202)
1548

    
1549
            # assert metadata have been updated
1550
            meta = self.get_object_meta(self.container, self.object)
1551

    
1552
            for k, v in d.items():
1553
                self.assertTrue(k.title() in meta)
1554
                self.assertTrue(meta[k.title()], v)
1555

    
1556
            # Header key too large
1557
            d = {'a' * 115: 'b' * 256}
1558
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1559
                          k, v in d.items())
1560
            r = self.post(url, content_type='', **kwargs)
1561
            self.assertEqual(r.status_code, 400)
1562

    
1563
            # Header value too large
1564
            d = {'a' * 114: 'b' * 257}
1565
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1566
                          k, v in d.items())
1567
            r = self.post(url, content_type='', **kwargs)
1568
            self.assertEqual(r.status_code, 400)
1569

    
1570
#            # Check utf-8 meta
1571
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
1572
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1573
#                          k, v in d.items())
1574
#            url = join_urls(self.pithos_path, self.user, self.container,
1575
#                            self.object)
1576
#            r = self.post(url, content_type='', **kwargs)
1577
#            self.assertEqual(r.status_code, 202)
1578
#
1579
#            # assert metadata have been updated
1580
#            meta = self.get_object_meta(self.container, self.object)
1581
#
1582
#            for k, v in d.items():
1583
#                key = 'X-Object-Meta-%s' % k.title()
1584
#                self.assertTrue(key in meta)
1585
#                self.assertTrue(meta[key], v)
1586
#
1587
#            # Header key too large
1588
#            d = {'α' * 114: 'β' * (256 / 2)}
1589
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1590
#                          k, v in d.items())
1591
#            r = self.post(url, content_type='', **kwargs)
1592
#            self.assertEqual(r.status_code, 400)
1593
#
1594
#            # Header value too large
1595
#            d = {'α' * 114: 'β' * 256}
1596
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1597
#                          k, v in d.items())
1598
#            r = self.udpate(url, content_type='', **kwargs)
1599
#            self.assertEqual(r.status_code, 400)
1600

    
1601
    def test_update_object(self):
1602
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1603
        oname, odata = self.upload_object(
1604
            self.container, length=random.randint(
1605
                block_size + 2, 2 * block_size))[:2]
1606

    
1607
        length = len(odata)
1608
        first_byte_pos = random.randint(1, block_size)
1609
        last_byte_pos = random.randint(block_size + 1, length - 1)
1610
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1611
        kwargs = {'content_type': 'application/octet-stream',
1612
                  'HTTP_CONTENT_RANGE': range}
1613

    
1614
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1615
        partial = last_byte_pos - first_byte_pos + 1
1616
        data = get_random_data(partial)
1617
        r = self.post(url, data=data, **kwargs)
1618

    
1619
        self.assertEqual(r.status_code, 204)
1620
        self.assertTrue('ETag' in r)
1621
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1622
                                     data)
1623
        if pithos_settings.UPDATE_MD5:
1624
            etag = md5_hash(updated_data)
1625
        else:
1626
            etag = merkle(updated_data)
1627
        #self.assertEqual(r['ETag'], etag)
1628

    
1629
        # check modified object
1630
        r = self.get(url)
1631

    
1632
        self.assertEqual(r.status_code, 200)
1633
        self.assertEqual(r.content, updated_data)
1634
        self.assertEqual(etag, r['ETag'])
1635

    
1636
    def test_update_object_divided_by_blocksize(self):
1637
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1638
        oname, odata = self.upload_object(self.container,
1639
                                          length=2 * block_size)[:2]
1640

    
1641
        length = len(odata)
1642
        first_byte_pos = block_size
1643
        last_byte_pos = 2 * block_size - 1
1644
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1645
        kwargs = {'content_type': 'application/octet-stream',
1646
                  'HTTP_CONTENT_RANGE': range}
1647

    
1648
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1649
        partial = last_byte_pos - first_byte_pos + 1
1650
        data = get_random_data(partial)
1651
        r = self.post(url, data=data, **kwargs)
1652

    
1653
        self.assertEqual(r.status_code, 204)
1654
        self.assertTrue('ETag' in r)
1655
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1656
                                     data)
1657
        if pithos_settings.UPDATE_MD5:
1658
            etag = md5_hash(updated_data)
1659
        else:
1660
            etag = merkle(updated_data)
1661
        #self.assertEqual(r['ETag'], etag)
1662

    
1663
        # check modified object
1664
        r = self.get(url)
1665

    
1666
        self.assertEqual(r.status_code, 200)
1667
        self.assertEqual(r.content, updated_data)
1668
        self.assertEqual(etag, r['ETag'])
1669

    
1670
    def test_update_object_invalid_content_length(self):
1671
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1672
        oname, odata = self.upload_object(
1673
            self.container, length=random.randint(
1674
                block_size + 1, 2 * block_size))[:2]
1675

    
1676
        length = len(odata)
1677
        first_byte_pos = random.randint(1, block_size)
1678
        last_byte_pos = random.randint(block_size + 1, length - 1)
1679
        partial = last_byte_pos - first_byte_pos + 1
1680
        data = get_random_data(partial)
1681
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1682
        kwargs = {'content_type': 'application/octet-stream',
1683
                  'HTTP_CONTENT_RANGE': range,
1684
                  'CONTENT_LENGTH': str(partial + 1)}
1685

    
1686
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1687
        r = self.post(url, data=data, **kwargs)
1688

    
1689
        self.assertEqual(r.status_code, 400)
1690

    
1691
    def test_update_object_invalid_range(self):
1692
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1693
        oname, odata = self.upload_object(
1694
            self.container, length=random.randint(block_size + 1,
1695
                                                  2 * block_size))[:2]
1696

    
1697
        length = len(odata)
1698
        first_byte_pos = random.randint(1, block_size)
1699
        last_byte_pos = first_byte_pos - 1
1700
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1701
        kwargs = {'content_type': 'application/octet-stream',
1702
                  'HTTP_CONTENT_RANGE': range}
1703

    
1704
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1705
        r = self.post(url, data=get_random_data(), **kwargs)
1706

    
1707
        self.assertEqual(r.status_code, 416)
1708

    
1709
    def test_update_object_out_of_limits(self):
1710
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1711
        oname, odata = self.upload_object(
1712
            self.container, length=random.randint(block_size + 1,
1713
                                                  2 * block_size))[:2]
1714

    
1715
        length = len(odata)
1716
        first_byte_pos = random.randint(1, block_size)
1717
        last_byte_pos = length + 1
1718
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1719
        kwargs = {'content_type': 'application/octet-stream',
1720
                  'HTTP_CONTENT_RANGE': range}
1721

    
1722
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1723
        r = self.post(url, data=get_random_data(), **kwargs)
1724

    
1725
        self.assertEqual(r.status_code, 416)
1726

    
1727
    def test_append(self):
1728
        data = get_random_data()
1729
        length = len(data)
1730
        url = join_urls(self.pithos_path, self.user, self.container,
1731
                        self.object)
1732
        r = self.post(url, data=data, content_type='application/octet-stream',
1733
                      HTTP_CONTENT_LENGTH=str(length),
1734
                      HTTP_CONTENT_RANGE='bytes */*')
1735
        self.assertEqual(r.status_code, 204)
1736

    
1737
        r = self.get(url)
1738
        content = r.content
1739
        self.assertEqual(len(content), len(self.object_data) + length)
1740
        self.assertEqual(content, self.object_data + data)
1741

    
1742
    # TODO Fix the test
1743
    def _test_update_with_chunked_transfer(self):
1744
        data = get_random_data()
1745
        length = len(data)
1746

    
1747
        url = join_urls(self.pithos_path, self.user, self.container,
1748
                        self.object)
1749
        r = self.post(url, data=data, content_type='application/octet-stream',
1750
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1751
                      HTTP_TRANSFER_ENCODING='chunked')
1752
        self.assertEqual(r.status_code, 204)
1753

    
1754
        # check modified object
1755
        r = self.get(url)
1756
        content = r.content
1757
        self.assertEqual(content[0:length], data)
1758
        self.assertEqual(content[length:], self.object_data[length:])
1759

    
1760
    def test_update_from_other_object(self):
1761
        src = self.object
1762
        dest = get_random_name()
1763

    
1764
        url = join_urls(self.pithos_path, self.user, self.container, src)
1765
        r = self.get(url)
1766
        source_data = r.content
1767
        source_meta = self.get_object_info(self.container, src)
1768

    
1769
        # update zero length object
1770
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1771
        r = self.put(url, data='')
1772
        self.assertEqual(r.status_code, 201)
1773

    
1774
        r = self.post(url,
1775
                      HTTP_CONTENT_RANGE='bytes */*',
1776
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1777
        self.assertEqual(r.status_code, 204)
1778

    
1779
        r = self.get(url)
1780
        dest_data = r.content
1781
        dest_meta = self.get_object_info(self.container, dest)
1782

    
1783
        self.assertEqual(source_data, dest_data)
1784
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1785
        self.assertEqual(source_meta['X-Object-Hash'],
1786
                         dest_meta['X-Object-Hash'])
1787
        self.assertTrue(
1788
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1789

    
1790
    def test_update_range_from_other_object(self):
1791
        src = self.object
1792
        dest = get_random_name()
1793

    
1794
        url = join_urls(self.pithos_path, self.user, self.container, src)
1795
        r = self.get(url)
1796
        source_data = r.content
1797
        source_length = len(source_data)
1798

    
1799
        # update zero length object
1800
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1801
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1802
        initial_data = get_random_data(
1803
            length=source_length + random.randint(0, block_size))
1804

    
1805
        r = self.put(url, data=initial_data)
1806
        self.assertEqual(r.status_code, 201)
1807

    
1808
        offset = random.randint(0, source_length - 1)
1809
        upto = random.randint(offset, source_length - 1)
1810
        r = self.post(url,
1811
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1812
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1813
        self.assertEqual(r.status_code, 204)
1814

    
1815
        r = self.get(url)
1816
        content = r.content
1817
        self.assertEqual(content, (initial_data[:offset] +
1818
                                   source_data[:upto - offset + 1] +
1819
                                   initial_data[upto + 1:]))
1820

    
1821
    def test_update_range_from_invalid_other_object(self):
1822
        src = self.object
1823
        dest = get_random_name()
1824

    
1825
        url = join_urls(self.pithos_path, self.user, self.container, src)
1826
        r = self.get(url)
1827

    
1828
        # update zero length object
1829
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1830
        initial_data = get_random_data()
1831
        length = len(initial_data)
1832
        r = self.put(url, data=initial_data)
1833
        self.assertEqual(r.status_code, 201)
1834

    
1835
        offset = random.randint(1, length - 2)
1836
        upto = random.randint(offset, length - 1)
1837

    
1838
        # source object does not start with /
1839
        r = self.post(url,
1840
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1841
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1842
        self.assertEqual(r.status_code, 400)
1843

    
1844
        # source object does not exist
1845
        r = self.post(url,
1846
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1847
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1848
        self.assertEqual(r.status_code, 404)
1849

    
1850
    def test_restore_version(self):
1851
        info = self.get_object_info(self.container, self.object)
1852
        v = []
1853
        append = v.append
1854
        append((info['X-Object-Version'],
1855
                int(info['Content-Length']),
1856
                self.object_data))
1857

    
1858
        # update object
1859
        data, r = self.upload_object(self.container, self.object,
1860
                                     length=v[0][1] - 1)[1:]
1861
        self.assertTrue('X-Object-Version' in r)
1862
        append((r['X-Object-Version'], len(data), data))
1863
        # v[0][1] > v[1][1]
1864

    
1865
        # update with the previous version
1866
        url = join_urls(self.pithos_path, self.user, self.container,
1867
                        self.object)
1868
        r = self.post(url,
1869
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1870
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1871
                                                       self.object),
1872
                      HTTP_X_SOURCE_VERSION=v[0][0],
1873
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1874
        self.assertEqual(r.status_code, 204)
1875
        # v[2][1] = v[0][1] > v[1][1]
1876

    
1877
        # check content
1878
        r = self.get(url)
1879
        content = r.content
1880
        self.assertEqual(len(content), v[0][1])
1881
        self.assertEqual(content, self.object_data)
1882
        append((r['X-Object-Version'], len(content), content))
1883

    
1884
        # update object content(v4) > content(v2)
1885
        data, r = self.upload_object(self.container, self.object,
1886
                                     length=v[2][1] + 1)[1:]
1887
        self.assertTrue('X-Object-Version' in r)
1888
        append((r['X-Object-Version'], len(data), data))
1889
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1890

    
1891
        # update with the previous version
1892
        r = self.post(url,
1893
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1894
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1895
                                                       self.object),
1896
                      HTTP_X_SOURCE_VERSION=v[2][0],
1897
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1898
        self.assertEqual(r.status_code, 204)
1899
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1900

    
1901
        # check content
1902
        r = self.get(url)
1903
        data = r.content
1904
        self.assertEqual(data, v[2][2])
1905
        append((r['X-Object-Version'], len(data), data))
1906

    
1907
    def test_update_from_other_version(self):
1908
        versions = []
1909
        info = self.get_object_info(self.container, self.object)
1910
        versions.append(info['X-Object-Version'])
1911
        pre_length = int(info['Content-Length'])
1912

    
1913
        # update object
1914
        d1, r = self.upload_object(self.container, self.object,
1915
                                   length=pre_length - 1)[1:]
1916
        self.assertTrue('X-Object-Version' in r)
1917
        versions.append(r['X-Object-Version'])
1918

    
1919
        # update object
1920
        d2, r = self.upload_object(self.container, self.object,
1921
                                   length=pre_length - 2)[1:]
1922
        self.assertTrue('X-Object-Version' in r)
1923
        versions.append(r['X-Object-Version'])
1924

    
1925
        # get previous version
1926
        url = join_urls(self.pithos_path, self.user, self.container,
1927
                        self.object)
1928
        r = self.get('%s?version=list&format=json' % url)
1929
        self.assertEqual(r.status_code, 200)
1930
        l = json.loads(r.content)['versions']
1931
        self.assertEqual(len(l), 3)
1932
        self.assertEqual([str(v[0]) for v in l], versions)
1933

    
1934
        # update with the previous version
1935
        r = self.post(url,
1936
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1937
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1938
                                                       self.object),
1939
                      HTTP_X_SOURCE_VERSION=versions[0])
1940
        self.assertEqual(r.status_code, 204)
1941

    
1942
        # check content
1943
        r = self.get(url)
1944
        content = r.content
1945
        self.assertEqual(len(content), pre_length)
1946
        self.assertEqual(content, self.object_data)
1947

    
1948
        # update object
1949
        d3, r = self.upload_object(self.container, self.object,
1950
                                   length=len(d2) + 1)[1:]
1951
        self.assertTrue('X-Object-Version' in r)
1952
        versions.append(r['X-Object-Version'])
1953

    
1954
        # update with the previous version
1955
        r = self.post(url,
1956
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1957
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1958
                                                       self.object),
1959
                      HTTP_X_SOURCE_VERSION=versions[-2])
1960
        self.assertEqual(r.status_code, 204)
1961

    
1962
        # check content
1963
        r = self.get(url)
1964
        content = r.content
1965
        self.assertEqual(content, d2 + d3[-1])
1966

    
1967
    def test_update_invalid_permissions(self):
1968
        url = join_urls(self.pithos_path, self.user, self.container,
1969
                        self.object)
1970
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1971
                      HTTP_X_OBJECT_SHARING='%s' % (257*'a'))
1972
        self.assertEqual(r.status_code, 400)
1973

    
1974
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1975
                      HTTP_X_OBJECT_SHARING='read=%s' % (257*'a'))
1976
        self.assertEqual(r.status_code, 400)
1977

    
1978
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1979
                      HTTP_X_OBJECT_SHARING='write=%s' % (257*'a'))
1980
        self.assertEqual(r.status_code, 400)
1981

    
1982

    
1983
class ObjectDelete(PithosAPITest):
1984
    def setUp(self):
1985
        PithosAPITest.setUp(self)
1986
        self.container = 'c1'
1987
        self.create_container(self.container)
1988
        self.object, self.object_data = self.upload_object(self.container)[:2]
1989

    
1990
    def test_delete(self):
1991
        url = join_urls(self.pithos_path, self.user, self.container,
1992
                        self.object)
1993
        r = self.delete(url)
1994
        self.assertEqual(r.status_code, 204)
1995

    
1996
        r = self.head(url)
1997
        self.assertEqual(r.status_code, 404)
1998

    
1999
    def test_delete_non_existent(self):
2000
        url = join_urls(self.pithos_path, self.user, self.container,
2001
                        get_random_name())
2002
        r = self.delete(url)
2003
        self.assertEqual(r.status_code, 404)
2004

    
2005
    @pithos_test_settings(API_LIST_LIMIT=10)
2006
    def test_delete_dir(self):
2007
        folder = self.create_folder(self.container)[0]
2008
        subfolder = self.create_folder(
2009
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
2010
        objects = [subfolder]
2011
        append = objects.append
2012
        for i in range(11):
2013
            append(self.upload_object(self.container,
2014
                                      '%s/%d' % (folder, i),
2015
                                      depth='1')[0])
2016
        other = self.upload_object(self.container, strnextling(folder))[0]
2017

    
2018
        # move dir
2019
        url = join_urls(self.pithos_path, self.user, self.container, folder)
2020
        r = self.delete('%s?delimiter=/' % url)
2021
        self.assertEqual(r.status_code, 204)
2022

    
2023
        for obj in objects:
2024
            # assert object does not exist
2025
            url = join_urls(self.pithos_path, self.user, self.container, obj)
2026
            r = self.head(url)
2027
            self.assertEqual(r.status_code, 404)
2028

    
2029
        # assert other has not been deleted
2030
        url = join_urls(self.pithos_path, self.user, self.container, other)
2031
        r = self.head(url)
2032
        self.assertEqual(r.status_code, 200)