Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 474e609a

History | View | Annotate | Download (78.1 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from collections import defaultdict
38
from urllib import quote, unquote
39
from functools import partial
40

    
41
from pithos.api.test import (PithosAPITest, pithos_settings,
42
                             AssertMappingInvariant, AssertUUidInvariant,
43
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44
                             DATE_FORMATS, pithos_test_settings)
45
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46
                                  get_random_data, get_random_name)
47

    
48
from synnefo.lib import join_urls
49

    
50
import django.utils.simplejson as json
51

    
52
import random
53
import re
54
import datetime
55
import time as _time
56

    
57
merkle = partial(merkle,
58
                 blocksize=TEST_BLOCK_SIZE,
59
                 blockhash=TEST_HASH_ALGORITHM)
60

    
61

    
62
class ObjectHead(PithosAPITest):
63
    def test_get_object_meta(self):
64
        cname = self.create_container()[0]
65
        oname, odata = self.upload_object(cname)[:-1]
66

    
67
        url = join_urls(self.pithos_path, self.user, cname, oname)
68
        r = self.head(url)
69

    
70
        mandatory = ['Etag',
71
                     'Content-Length',
72
                     'Content-Type',
73
                     'Last-Modified',
74
                     'X-Object-Hash',
75
                     'X-Object-UUID',
76
                     'X-Object-Version',
77
                     'X-Object-Version-Timestamp',
78
                     'X-Object-Modified-By']
79
        for i in mandatory:
80
            self.assertTrue(i in r)
81

    
82
        r = self.post(url, content_type='',
83
                      HTTP_CONTENT_ENCODING='gzip',
84
                      HTTP_CONTENT_DISPOSITION=(
85
                          'attachment; filename="%s"' % oname))
86
        self.assertEqual(r.status_code, 202)
87

    
88
        r = self.head(url)
89
        for i in mandatory:
90
            self.assertTrue(i in r)
91
        self.assertTrue('Content-Encoding' in r)
92
        self.assertEqual(r['Content-Encoding'], 'gzip')
93
        self.assertTrue('Content-Disposition' in r)
94
        self.assertEqual(unquote(r['Content-Disposition']),
95
                         'attachment; filename="%s"' % oname)
96

    
97
        prefix = 'myobject/'
98
        data = ''
99
        for i in range(random.randint(2, 10)):
100
            part = '%s%d' % (prefix, i)
101
            data += self.upload_object(cname, oname=part)[1]
102

    
103
        manifest = '%s/%s' % (cname, prefix)
104
        oname = get_random_name()
105
        url = join_urls(self.pithos_path, self.user, cname, oname)
106
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
107
        self.assertEqual(r.status_code, 201)
108

    
109
        r = self.head(url)
110
        for i in mandatory:
111
            self.assertTrue(i in r)
112
        self.assertTrue('X-Object-Manifest' in r)
113
        self.assertEqual(r['X-Object-Manifest'], manifest)
114

    
115

    
116
class ObjectGet(PithosAPITest):
117
    def setUp(self):
118
        PithosAPITest.setUp(self)
119
        self.containers = ['c1', 'c2']
120

    
121
        # create some containers
122
        for c in self.containers:
123
            self.create_container(c)
124

    
125
        # upload files
126
        self.objects = defaultdict(list)
127
        self.objects['c1'].append(self.upload_object('c1')[0])
128

    
129
    def test_versions(self):
130
        c = 'c1'
131
        o = self.objects[c][0]
132
        url = join_urls(self.pithos_path, self.user, c, o)
133

    
134
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
135
        r = self.post(url, content_type='', **meta)
136
        self.assertEqual(r.status_code, 202)
137

    
138
        url = join_urls(self.pithos_path, self.user, c, o)
139
        r = self.get('%s?version=list&format=json' % url)
140
        self.assertEqual(r.status_code, 200)
141
        l1 = json.loads(r.content)['versions']
142
        self.assertEqual(len(l1), 2)
143

    
144
        # update meta
145
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
146
                'HTTP_X_OBJECT_META_STOCK': 'True'}
147
        r = self.post(url, content_type='', **meta)
148
        self.assertEqual(r.status_code, 202)
149

    
150
        # assert a newly created version has been created
151
        r = self.get('%s?version=list&format=json' % url)
152
        self.assertEqual(r.status_code, 200)
153
        l2 = json.loads(r.content)['versions']
154
        self.assertEqual(len(l2), len(l1) + 1)
155
        self.assertEqual(l2[:-1], l1)
156

    
157
        vserial, _ = l2[-2]
158
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
159
                         {'Quality': 'AAA'})
160

    
161
        # update data
162
        self.append_object_data(c, o)
163

    
164
        # assert a newly created version has been created
165
        r = self.get('%s?version=list&format=json' % url)
166
        self.assertEqual(r.status_code, 200)
167
        l3 = json.loads(r.content)['versions']
168
        self.assertEqual(len(l3), len(l2) + 1)
169
        self.assertEqual(l3[:-1], l2)
170

    
171
    def test_get_version(self):
172
        c = 'c1'
173
        o = self.objects[c][0]
174
        url = join_urls(self.pithos_path, self.user, c, o)
175

    
176
        # Update metadata
177
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
178
        r = self.post(url, content_type='', **meta)
179
        self.assertEqual(r.status_code, 202)
180

    
181
        url = join_urls(self.pithos_path, self.user, c, o)
182
        r = self.get('%s?version=list&format=json' % url)
183
        self.assertEqual(r.status_code, 200)
184
        l = json.loads(r.content)['versions']
185
        self.assertEqual(len(l), 2)
186

    
187
        r = self.head('%s?version=%s' % (url, l[0][0]))
188
        self.assertEqual(r.status_code, 200)
189
        self.assertTrue('X-Object-Meta-Quality' not in r)
190

    
191
        r = self.head('%s?version=%s' % (url, l[1][0]))
192
        self.assertEqual(r.status_code, 200)
193
        self.assertTrue('X-Object-Meta-Quality' in r)
194

    
195
        # test invalid version
196
        r = self.head('%s?version=-1' % url)
197
        self.assertEqual(r.status_code, 404)
198

    
199
        other_name, other_data, r = self.upload_object(c)
200
        self.assertTrue('X-Object-Version' in r)
201
        other_version = r['X-Object-Version']
202

    
203
        self.assertTrue(o != other_name)
204

    
205
        r = self.get('%s?version=%s' % (url, other_version))
206
        self.assertEqual(r.status_code, 404)
207

    
208
        r = self.head('%s?version=%s' % (url, other_version))
209
        self.assertEqual(r.status_code, 404)
210

    
211
    def test_objects_with_trailing_spaces(self):
212
        # create object
213
        oname = self.upload_object('c1')[0]
214
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
215

    
216
        r = self.get(quote('%s ' % url))
217
        self.assertEqual(r.status_code, 404)
218

    
219
        # delete object
220
        self.delete(url)
221

    
222
        r = self.get(url)
223
        self.assertEqual(r.status_code, 404)
224

    
225
        # upload object with trailing space
226
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
227

    
228
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
229
        r = self.get(url)
230
        self.assertEqual(r.status_code, 200)
231

    
232
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
233
        r = self.get(url)
234
        self.assertEqual(r.status_code, 404)
235

    
236
    def test_get_partial(self):
237
        cname = self.containers[0]
238
        oname, odata = self.upload_object(cname, length=512)[:-1]
239
        url = join_urls(self.pithos_path, self.user, cname, oname)
240
        r = self.get(url, HTTP_RANGE='bytes=0-499')
241
        self.assertEqual(r.status_code, 206)
242
        data = r.content
243
        self.assertEqual(data, odata[:500])
244
        self.assertTrue('Content-Range' in r)
245
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
246
        self.assertTrue('Content-Type' in r)
247
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
248

    
249
    def test_get_final_500(self):
250
        cname = self.containers[0]
251
        oname, odata = self.upload_object(cname, length=512)[:-1]
252
        size = len(odata)
253
        url = join_urls(self.pithos_path, self.user, cname, oname)
254
        r = self.get(url, HTTP_RANGE='bytes=-500')
255
        self.assertEqual(r.status_code, 206)
256
        self.assertEqual(r.content, odata[-500:])
257
        self.assertTrue('Content-Range' in r)
258
        self.assertEqual(r['Content-Range'],
259
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
260
        self.assertTrue('Content-Type' in r)
261
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
262

    
263
    def test_get_rest(self):
264
        cname = self.containers[0]
265
        oname, odata = self.upload_object(cname, length=512)[:-1]
266
        size = len(odata)
267
        url = join_urls(self.pithos_path, self.user, cname, oname)
268
        offset = len(odata) - random.randint(1, 512)
269
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
270
        self.assertEqual(r.status_code, 206)
271
        self.assertEqual(r.content, odata[offset:])
272
        self.assertTrue('Content-Range' in r)
273
        self.assertEqual(r['Content-Range'],
274
                         'bytes %s-%s/%s' % (offset, size - 1, size))
275
        self.assertTrue('Content-Type' in r)
276
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
277

    
278
    def test_get_range_not_satisfiable(self):
279
        cname = self.containers[0]
280
        oname, odata = self.upload_object(cname, length=512)[:-1]
281
        url = join_urls(self.pithos_path, self.user, cname, oname)
282

    
283
        # TODO
284
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
285
        #self.assertEqual(r.status_code, 416)
286

    
287
        offset = len(odata) + 1
288
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
289
        self.assertEqual(r.status_code, 416)
290

    
291
    def test_multiple_range(self):
292
        cname = self.containers[0]
293
        oname, odata = self.upload_object(cname)[:-1]
294
        url = join_urls(self.pithos_path, self.user, cname, oname)
295

    
296
        l = ['0-499', '-500', '1000-']
297
        ranges = 'bytes=%s' % ','.join(l)
298
        r = self.get(url, HTTP_RANGE=ranges)
299
        self.assertEqual(r.status_code, 206)
300
        self.assertTrue('content-type' in r)
301
        p = re.compile(
302
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
303
            re.I)
304
        m = p.match(r['content-type'])
305
        if m is None:
306
            self.fail('Invalid multiple range content type')
307
        boundary = m.groupdict()['boundary']
308
        cparts = r.content.split('--%s' % boundary)[1:-1]
309

    
310
        # assert content parts length
311
        self.assertEqual(len(cparts), len(l))
312

    
313
        # for each content part assert headers
314
        i = 0
315
        for cpart in cparts:
316
            content = cpart.split('\r\n')
317
            headers = content[1:3]
318
            content_range = headers[0].split(': ')
319
            self.assertEqual(content_range[0], 'Content-Range')
320

    
321
            r = l[i].split('-')
322
            if not r[0] and not r[1]:
323
                pass
324
            elif not r[0]:
325
                start = len(odata) - int(r[1])
326
                end = len(odata)
327
            elif not r[1]:
328
                start = int(r[0])
329
                end = len(odata)
330
            else:
331
                start = int(r[0])
332
                end = int(r[1]) + 1
333
            fdata = odata[start:end]
334
            sdata = '\r\n'.join(content[4:-1])
335
            self.assertEqual(len(fdata), len(sdata))
336
            self.assertEquals(fdata, sdata)
337
            i += 1
338

    
339
    def test_multiple_range_not_satisfiable(self):
340
        # perform get with multiple range
341
        cname = self.containers[0]
342
        oname, odata = self.upload_object(cname)[:-1]
343
        out_of_range = len(odata) + 1
344
        l = ['0-499', '-500', '%d-' % out_of_range]
345
        ranges = 'bytes=%s' % ','.join(l)
346
        url = join_urls(self.pithos_path, self.user, cname, oname)
347
        r = self.get(url, HTTP_RANGE=ranges)
348
        self.assertEqual(r.status_code, 416)
349

    
350
    def test_get_if_match(self):
351
        cname = self.containers[0]
352
        oname, odata = self.upload_object(cname)[:-1]
353

    
354
        # perform get with If-Match
355
        url = join_urls(self.pithos_path, self.user, cname, oname)
356

    
357
        if pithos_settings.UPDATE_MD5:
358
            etag = md5_hash(odata)
359
        else:
360
            etag = merkle(odata)
361

    
362
        r = self.get(url, HTTP_IF_MATCH=etag)
363

    
364
        # assert get success
365
        self.assertEqual(r.status_code, 200)
366

    
367
        # assert response content
368
        self.assertEqual(r.content, odata)
369

    
370
    def test_get_if_match_star(self):
371
        cname = self.containers[0]
372
        oname, odata = self.upload_object(cname)[:-1]
373

    
374
        # perform get with If-Match *
375
        url = join_urls(self.pithos_path, self.user, cname, oname)
376
        r = self.get(url, HTTP_IF_MATCH='*')
377

    
378
        # assert get success
379
        self.assertEqual(r.status_code, 200)
380

    
381
        # assert response content
382
        self.assertEqual(r.content, odata)
383

    
384
    def test_get_multiple_if_match(self):
385
        cname = self.containers[0]
386
        oname, odata = self.upload_object(cname)[:-1]
387

    
388
        # perform get with If-Match
389
        url = join_urls(self.pithos_path, self.user, cname, oname)
390

    
391
        if pithos_settings.UPDATE_MD5:
392
            etag = md5_hash(odata)
393
        else:
394
            etag = merkle(odata)
395

    
396
        quoted = lambda s: '"%s"' % s
397
        r = self.get(url, HTTP_IF_MATCH=','.join(
398
            [quoted(etag), quoted(get_random_data(64))]))
399

    
400
        # assert get success
401
        self.assertEqual(r.status_code, 200)
402

    
403
        # assert response content
404
        self.assertEqual(r.content, odata)
405

    
406
    def test_if_match_precondition_failed(self):
407
        cname = self.containers[0]
408
        oname, odata = self.upload_object(cname)[:-1]
409

    
410
        # perform get with If-Match
411
        url = join_urls(self.pithos_path, self.user, cname, oname)
412
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
413
        self.assertEqual(r.status_code, 412)
414

    
415
    def test_if_none_match(self):
416
        # upload object
417
        cname = self.containers[0]
418
        oname, odata = self.upload_object(cname)[:-1]
419

    
420
        if pithos_settings.UPDATE_MD5:
421
            etag = md5_hash(odata)
422
        else:
423
            etag = merkle(odata)
424

    
425
        # perform get with If-None-Match
426
        url = join_urls(self.pithos_path, self.user, cname, oname)
427
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
428

    
429
        # assert precondition_failed
430
        self.assertEqual(r.status_code, 304)
431

    
432
        # update object data
433
        r = self.append_object_data(cname, oname)[-1]
434
        self.assertTrue(etag != r['ETag'])
435

    
436
        # perform get with If-None-Match
437
        url = join_urls(self.pithos_path, self.user, cname, oname)
438
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
439

    
440
        # assert get success
441
        self.assertEqual(r.status_code, 200)
442

    
443
    def test_if_none_match_star(self):
444
        # upload object
445
        cname = self.containers[0]
446
        oname, odata = self.upload_object(cname)[:-1]
447

    
448
        # perform get with If-None-Match with star
449
        url = join_urls(self.pithos_path, self.user, cname, oname)
450
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
451

    
452
        self.assertEqual(r.status_code, 304)
453

    
454
    def test_if_modified_since(self):
455
        # upload object
456
        cname = self.containers[0]
457
        oname, odata = self.upload_object(cname)[:-1]
458
        object_info = self.get_object_info(cname, oname)
459
        last_modified = object_info['Last-Modified']
460
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
461
        t1_formats = map(t1.strftime, DATE_FORMATS)
462

    
463
        # Check not modified since
464
        url = join_urls(self.pithos_path, self.user, cname, oname)
465
        for t in t1_formats:
466
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
467
            self.assertEqual(r.status_code, 304)
468

    
469
        _time.sleep(1)
470

    
471
        # update object data
472
        appended_data = self.append_object_data(cname, oname)[1]
473

    
474
        # Check modified since
475
        url = join_urls(self.pithos_path, self.user, cname, oname)
476
        for t in t1_formats:
477
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
478
            self.assertEqual(r.status_code, 200)
479
            self.assertEqual(r.content, odata + appended_data)
480

    
481
    def test_if_modified_since_invalid_date(self):
482
        cname = self.containers[0]
483
        oname, odata = self.upload_object(cname)[:-1]
484
        url = join_urls(self.pithos_path, self.user, cname, oname)
485
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
486
        self.assertEqual(r.status_code, 200)
487
        self.assertEqual(r.content, odata)
488

    
489
    def test_if_not_modified_since(self):
490
        cname = self.containers[0]
491
        oname, odata = self.upload_object(cname)[:-1]
492
        url = join_urls(self.pithos_path, self.user, cname, oname)
493
        object_info = self.get_object_info(cname, oname)
494
        last_modified = object_info['Last-Modified']
495
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
496

    
497
        # Check unmodified
498
        t1 = t + datetime.timedelta(seconds=1)
499
        t1_formats = map(t1.strftime, DATE_FORMATS)
500
        for t in t1_formats:
501
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
502
            self.assertEqual(r.status_code, 200)
503
            self.assertEqual(r.content, odata)
504

    
505
        # modify object
506
        _time.sleep(2)
507
        self.append_object_data(cname, oname)
508

    
509
        object_info = self.get_object_info(cname, oname)
510
        last_modified = object_info['Last-Modified']
511
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
512
        t2 = t - datetime.timedelta(seconds=1)
513
        t2_formats = map(t2.strftime, DATE_FORMATS)
514

    
515
        # check modified
516
        for t in t2_formats:
517
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
518
            self.assertEqual(r.status_code, 412)
519

    
520
        # modify account: update object meta
521
        _time.sleep(1)
522
        self.update_object_meta(cname, oname, {'foo': 'bar'})
523

    
524
        object_info = self.get_object_info(cname, oname)
525
        last_modified = object_info['Last-Modified']
526
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
527
        t3 = t - datetime.timedelta(seconds=1)
528
        t3_formats = map(t3.strftime, DATE_FORMATS)
529

    
530
        # check modified
531
        for t in t3_formats:
532
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
533
            self.assertEqual(r.status_code, 412)
534

    
535
    def test_if_unmodified_since(self):
536
        cname = self.containers[0]
537
        oname, odata = self.upload_object(cname)[:-1]
538
        url = join_urls(self.pithos_path, self.user, cname, oname)
539
        object_info = self.get_object_info(cname, oname)
540
        last_modified = object_info['Last-Modified']
541
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
542
        t = t + datetime.timedelta(seconds=1)
543
        t_formats = map(t.strftime, DATE_FORMATS)
544

    
545
        for tf in t_formats:
546
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
547
            self.assertEqual(r.status_code, 200)
548
            self.assertEqual(r.content, odata)
549

    
550
    def test_if_unmodified_since_precondition_failed(self):
551
        cname = self.containers[0]
552
        oname, odata = self.upload_object(cname)[:-1]
553
        url = join_urls(self.pithos_path, self.user, cname, oname)
554
        object_info = self.get_object_info(cname, oname)
555
        last_modified = object_info['Last-Modified']
556
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
557
        t = t - datetime.timedelta(seconds=1)
558
        t_formats = map(t.strftime, DATE_FORMATS)
559

    
560
        for tf in t_formats:
561
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
562
            self.assertEqual(r.status_code, 412)
563

    
564
    def test_hashes(self):
565
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
566
        cname = self.containers[0]
567
        oname, odata = self.upload_object(cname, length=l)[:-1]
568
        size = len(odata)
569

    
570
        url = join_urls(self.pithos_path, self.user, cname, oname)
571
        r = self.get('%s?format=json&hashmap' % url)
572
        self.assertEqual(r.status_code, 200)
573
        body = json.loads(r.content)
574

    
575
        hashes = body['hashes']
576
        block_size = body['block_size']
577
        block_num = size / block_size if size / block_size == 0 else\
578
            size / block_size + 1
579
        self.assertTrue(len(hashes), block_num)
580
        i = 0
581
        for h in hashes:
582
            start = i * block_size
583
            end = (i + 1) * block_size
584
            hash = merkle(odata[start:end])
585
            self.assertEqual(h, hash)
586
            i += 1
587

    
588

    
589
class ObjectPut(PithosAPITest):
590
    def setUp(self):
591
        PithosAPITest.setUp(self)
592
        self.container = get_random_name()
593
        self.create_container(self.container)
594

    
595
    def test_upload(self):
596
        cname = self.container
597
        oname = get_random_name()
598
        data = get_random_data()
599
        meta = {'test': 'test1'}
600
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
601
                       for k, v in meta.iteritems())
602
        headers['HTTP_CONTENT_DISPOSITION'] = 'attachment; filename="%f2"'
603
        url = join_urls(self.pithos_path, self.user, cname, oname)
604
        r = self.put(url, data=data, content_type='application/pdf', **headers)
605
        self.assertEqual(r.status_code, 400)
606

    
607
        headers['HTTP_CONTENT_DISPOSITION'] = ('attachment; filename="%s"' %
608
                                               oname)
609
        url = join_urls(self.pithos_path, self.user, cname, oname)
610
        r = self.put(url, data=data, content_type='application/pdf', **headers)
611
        self.assertEqual(r.status_code, 201)
612
        self.assertTrue('ETag' in r)
613
        self.assertTrue('X-Object-Version' in r)
614

    
615
        info = self.get_object_info(cname, oname)
616

    
617
        # assert object meta
618
        self.assertTrue('X-Object-Meta-Test' in info)
619
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
620

    
621
        # assert content-type
622
        self.assertEqual(info['content-type'], 'application/pdf')
623

    
624
        # assert uploaded content
625
        r = self.get(url)
626
        self.assertEqual(r.status_code, 200)
627
        self.assertEqual(r.content, data)
628

    
629
    def test_maximum_upload_size_exceeds(self):
630
        cname = self.container
631
        oname = get_random_name()
632

    
633
        # set container quota to 100
634
        url = join_urls(self.pithos_path, self.user, cname)
635
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
636
        self.assertEqual(r.status_code, 202)
637

    
638
        info = self.get_container_info(cname)
639
        length = int(info['X-Container-Policy-Quota']) + 1
640

    
641
        data = get_random_data(length)
642
        url = join_urls(self.pithos_path, self.user, cname, oname)
643
        r = self.put(url, data=data)
644
        self.assertEqual(r.status_code, 413)
645

    
646
    def test_upload_with_name_containing_slash(self):
647
        cname = self.container
648
        oname = '/%s' % get_random_name()
649
        data = get_random_data()
650
        url = join_urls(self.pithos_path, self.user, cname, oname)
651
        r = self.put(url, data=data)
652
        self.assertEqual(r.status_code, 201)
653
        self.assertTrue('ETag' in r)
654
        self.assertTrue('X-Object-Version' in r)
655

    
656
        r = self.get(url)
657
        self.assertEqual(r.status_code, 200)
658
        self.assertEqual(r.content, data)
659

    
660
    def test_upload_unprocessable_entity(self):
661
        cname = self.container
662
        oname = get_random_name()
663
        data = get_random_data()
664
        url = join_urls(self.pithos_path, self.user, cname, oname)
665
        r = self.put(url, data=data, HTTP_ETAG='123')
666
        self.assertEqual(r.status_code, 422)
667

    
668
#    def test_chunked_transfer(self):
669
#        cname = self.container
670
#        oname = '/%s' % get_random_name()
671
#        data = get_random_data()
672
#        url = join_urls(self.pithos_path, self.user, cname, oname)
673
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
674
#        self.assertEqual(r.status_code, 201)
675
#        self.assertTrue('ETag' in r)
676
#        self.assertTrue('X-Object-Version' in r)
677

    
678
    def test_manifestation(self):
679
        cname = self.container
680
        prefix = 'myobject/'
681
        data = ''
682
        for i in range(random.randint(2, 10)):
683
            part = '%s%d' % (prefix, i)
684
            data += self.upload_object(cname, oname=part)[1]
685

    
686
        manifest = '%s/%s' % (cname, prefix)
687
        oname = get_random_name()
688
        url = join_urls(self.pithos_path, self.user, cname, oname)
689
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
690
        self.assertEqual(r.status_code, 201)
691

    
692
        # assert object exists
693
        r = self.get(url)
694
        self.assertEqual(r.status_code, 200)
695

    
696
        # assert its content
697
        self.assertEqual(r.content, data)
698

    
699
        # invalid manifestation
700
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
701
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
702
        r = self.get(url)
703
        self.assertEqual(r.content, '')
704

    
705
    def test_create_zero_length_object(self):
706
        cname = self.container
707
        oname = get_random_name()
708
        url = join_urls(self.pithos_path, self.user, cname, oname)
709
        r = self.put(url, data='')
710
        self.assertEqual(r.status_code, 201)
711

    
712
        r = self.get(url)
713
        self.assertEqual(r.status_code, 200)
714
        self.assertEqual(int(r['Content-Length']), 0)
715
        self.assertEqual(r.content, '')
716

    
717
        r = self.get('%s?hashmap=&format=json' % url)
718
        self.assertEqual(r.status_code, 200)
719
        body = json.loads(r.content)
720
        hashes = body['hashes']
721
        hash = merkle('')
722
        self.assertEqual(hashes, [hash])
723

    
724
    def test_create_object_by_hashmap(self):
725
        cname = self.container
726
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
727

    
728
        # upload an object
729
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
730
        # get it hashmap
731
        url = join_urls(self.pithos_path, self.user, cname, oname)
732
        r = self.get('%s?hashmap=&format=json' % url)
733

    
734
        oname = get_random_name()
735
        url = join_urls(self.pithos_path, self.user, cname, oname)
736
        r = self.put('%s?hashmap=' % url, data=r.content)
737
        self.assertEqual(r.status_code, 201)
738

    
739
        r = self.get(url)
740
        self.assertEqual(r.status_code, 200)
741
        self.assertEqual(r.content, data)
742

    
743
    def test_create_object_by_invalid_hashmap(self):
744
        cname = self.container
745
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
746

    
747
        # upload an object
748
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
749
        # get it hashmap
750
        url = join_urls(self.pithos_path, self.user, cname, oname)
751
        r = self.get('%s?hashmap=&format=json' % url)
752
        data = r.content
753
        try:
754
            hashmap = json.loads(data)
755
        except:
756
            self.fail('JSON format expected')
757

    
758
        oname = get_random_name()
759
        url = join_urls(self.pithos_path, self.user, cname, oname)
760
        hashmap['hashes'] = [get_random_name()]
761
        r = self.put('%s?hashmap=' % url, data=json.dumps(hashmap))
762
        self.assertEqual(r.status_code, 400)
763

    
764

    
765
class ObjectPutCopy(PithosAPITest):
766
    def setUp(self):
767
        PithosAPITest.setUp(self)
768
        self.container = 'c1'
769
        self.create_container(self.container)
770
        self.object, self.data = self.upload_object(self.container)[:-1]
771

    
772
        url = join_urls(
773
            self.pithos_path, self.user, self.container, self.object)
774
        r = self.head(url)
775
        self.etag = r['X-Object-Hash']
776

    
777
    def test_copy(self):
778
        with AssertMappingInvariant(self.get_object_info, self.container,
779
                                    self.object):
780
            # copy object
781
            oname = get_random_name()
782
            url = join_urls(self.pithos_path, self.user, self.container, oname)
783
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
784
                         HTTP_X_COPY_FROM='/%s/%s' % (
785
                             self.container, self.object))
786

    
787
            # assert copy success
788
            self.assertEqual(r.status_code, 201)
789

    
790
            # assert access the new object
791
            r = self.head(url)
792
            self.assertEqual(r.status_code, 200)
793
            self.assertTrue('X-Object-Meta-Test' in r)
794
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
795

    
796
            # assert etag is the same
797
            self.assertTrue('X-Object-Hash' in r)
798
            self.assertEqual(r['X-Object-Hash'], self.etag)
799

    
800
    def test_copy_from_different_container(self):
801
        cname = 'c2'
802
        self.create_container(cname)
803
        with AssertMappingInvariant(self.get_object_info, self.container,
804
                                    self.object):
805
            oname = get_random_name()
806
            url = join_urls(self.pithos_path, self.user, cname, oname)
807
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
808
                         HTTP_X_COPY_FROM='/%s/%s' % (
809
                             self.container, self.object))
810

    
811
            # assert copy success
812
            self.assertEqual(r.status_code, 201)
813

    
814
            # assert access the new object
815
            r = self.head(url)
816
            self.assertEqual(r.status_code, 200)
817
            self.assertTrue('X-Object-Meta-Test' in r)
818
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
819

    
820
            # assert etag is the same
821
            self.assertTrue('X-Object-Hash' in r)
822
            self.assertEqual(r['X-Object-Hash'], self.etag)
823

    
824
    def test_copy_from_other_account(self):
825
        cname = 'c2'
826
        self.create_container(cname, user='chuck')
827
        self.create_container(cname, user='alice')
828

    
829
        # share object for read with alice
830
        url = join_urls(self.pithos_path, self.user, self.container,
831
                        self.object)
832
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
833
                      HTTP_X_OBJECT_SHARING='read=alice')
834
        self.assertEqual(r.status_code, 202)
835

    
836
        # assert not allowed for chuck
837
        oname = get_random_name()
838
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
839
        r = self.put(url, data='', user='chuck',
840
                     HTTP_X_OBJECT_META_TEST='testcopy',
841
                     HTTP_X_COPY_FROM='/%s/%s' % (
842
                         self.container, self.object),
843
                     HTTP_X_SOURCE_ACCOUNT='user')
844

    
845
        self.assertEqual(r.status_code, 403)
846

    
847
        # assert copy success for alice
848
        url = join_urls(self.pithos_path, 'alice', cname, oname)
849
        r = self.put(url, data='', user='alice',
850
                     HTTP_X_OBJECT_META_TEST='testcopy',
851
                     HTTP_X_COPY_FROM='/%s/%s' % (
852
                         self.container, self.object),
853
                     HTTP_X_SOURCE_ACCOUNT='user')
854
        self.assertEqual(r.status_code, 201)
855

    
856
        # assert access the new object
857
        r = self.head(url, user='alice')
858
        self.assertEqual(r.status_code, 200)
859
        self.assertTrue('X-Object-Meta-Test' in r)
860
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
861

    
862
        # assert etag is the same
863
        self.assertTrue('X-Object-Hash' in r)
864
        self.assertEqual(r['X-Object-Hash'], self.etag)
865

    
866
        # share object for write
867
        url = join_urls(self.pithos_path, self.user, self.container,
868
                        self.object)
869
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
870
                      HTTP_X_OBJECT_SHARING='write=dan')
871
        self.assertEqual(r.status_code, 202)
872

    
873
        # assert not allowed copy for alice
874
        url = join_urls(self.pithos_path, 'alice', cname, oname)
875
        r = self.put(url, data='', user='alice',
876
                     HTTP_X_OBJECT_META_TEST='testcopy',
877
                     HTTP_X_COPY_FROM='/%s/%s' % (
878
                         self.container, self.object),
879
                     HTTP_X_SOURCE_ACCOUNT='user')
880
        self.assertEqual(r.status_code, 403)
881

    
882
        # assert allowed copy for dan
883
        self.create_container(cname, user='dan')
884
        url = join_urls(self.pithos_path, 'dan', cname, oname)
885
        r = self.put(url, data='', user='dan',
886
                     HTTP_X_OBJECT_META_TEST='testcopy',
887
                     HTTP_X_COPY_FROM='/%s/%s' % (
888
                         self.container, self.object),
889
                     HTTP_X_SOURCE_ACCOUNT='user')
890
        self.assertEqual(r.status_code, 201)
891

    
892
        # assert access the new object
893
        r = self.head(url, user='dan')
894
        self.assertEqual(r.status_code, 200)
895
        self.assertTrue('X-Object-Meta-Test' in r)
896
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
897

    
898
        # assert etag is the same
899
        self.assertTrue('X-Object-Hash' in r)
900
        self.assertEqual(r['X-Object-Hash'], self.etag)
901

    
902
        # assert source object still exists
903
        url = join_urls(self.pithos_path, self.user, self.container,
904
                        self.object)
905
        r = self.head(url)
906
        self.assertEqual(r.status_code, 200)
907
        # assert etag is the same
908
        self.assertTrue('X-Object-Hash' in r)
909
        self.assertEqual(r['X-Object-Hash'], self.etag)
910

    
911
        r = self.get(url)
912
        self.assertEqual(r.status_code, 200)
913
        # assert etag is the same
914
        self.assertTrue('X-Object-Hash' in r)
915
        self.assertEqual(r['X-Object-Hash'], self.etag)
916

    
917
    def test_copy_invalid(self):
918
        # copy from non-existent object
919
        oname = get_random_name()
920
        url = join_urls(self.pithos_path, self.user, self.container, oname)
921
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
922
                     HTTP_X_COPY_FROM='/%s/%s' % (
923
                         self.container, get_random_name()))
924
        self.assertEqual(r.status_code, 404)
925

    
926
        # copy from non-existent container
927
        oname = get_random_name()
928
        url = join_urls(self.pithos_path, self.user, self.container, oname)
929
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
930
                     HTTP_X_COPY_FROM='/%s/%s' % (
931
                         get_random_name(), self.object))
932
        self.assertEqual(r.status_code, 404)
933

    
934
    @pithos_test_settings(API_LIST_LIMIT=10)
935
    def test_copy_dir(self):
936
        folder = self.create_folder(self.container)[0]
937
        subfolder = self.create_folder(
938
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
939
        objects = [subfolder]
940
        append = objects.append
941
        for i in range(11):
942
            append(self.upload_object(self.container,
943
                                      '%s/%d' % (folder, i),
944
                                      depth='1')[0])
945
        other = self.upload_object(self.container, strnextling(folder))[0]
946

    
947
        # copy dir
948
        copy_folder = self.create_folder(self.container)[0]
949
        url = join_urls(self.pithos_path, self.user, self.container,
950
                        copy_folder)
951
        r = self.put('%s?delimiter=/' % url, data='',
952
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
953
        self.assertEqual(r.status_code, 201)
954

    
955
        for obj in objects:
956
            # assert object exists
957
            url = join_urls(self.pithos_path, self.user, self.container,
958
                            obj.replace(folder, copy_folder))
959
            r = self.head(url)
960
            self.assertEqual(r.status_code, 200)
961

    
962
            # assert metadata
963
            meta = self.get_object_meta(self.container, obj)
964
            for k in meta.keys():
965
                key = 'X-Object-Meta-%s' % k
966
                self.assertTrue(key in r)
967
                self.assertEqual(r[key], meta[k])
968

    
969
        # assert other has not been created under copy folder
970
        url = join_urls(self.pithos_path, self.user, self.container,
971
                        '%s/%s' % (copy_folder,
972
                                   other.replace(folder, copy_folder)))
973
        r = self.head(url)
974
        self.assertEqual(r.status_code, 404)
975

    
976

    
977
class ObjectPutMove(PithosAPITest):
978
    def setUp(self):
979
        PithosAPITest.setUp(self)
980
        self.container = 'c1'
981
        self.create_container(self.container)
982
        self.object, self.data = self.upload_object(self.container)[:-1]
983

    
984
        url = join_urls(
985
            self.pithos_path, self.user, self.container, self.object)
986
        r = self.head(url)
987
        self.etag = r['X-Object-Hash']
988

    
989
    def test_move(self):
990
        # move object
991
        oname = get_random_name()
992
        url = join_urls(self.pithos_path, self.user, self.container, oname)
993
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
994
                     HTTP_X_MOVE_FROM='/%s/%s' % (
995
                         self.container, self.object))
996

    
997
        # assert move success
998
        self.assertEqual(r.status_code, 201)
999

    
1000
        # assert access the new object
1001
        r = self.head(url)
1002
        self.assertEqual(r.status_code, 200)
1003
        self.assertTrue('X-Object-Meta-Test' in r)
1004
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1005

    
1006
        # assert etag is the same
1007
        self.assertTrue('X-Object-Hash' in r)
1008

    
1009
        # assert the initial object has been deleted
1010
        url = join_urls(self.pithos_path, self.user, self.container,
1011
                        self.object)
1012
        r = self.head(url)
1013
        self.assertEqual(r.status_code, 404)
1014

    
1015
    @pithos_test_settings(API_LIST_LIMIT=10)
1016
    def test_move_dir(self):
1017
        folder = self.create_folder(self.container)[0]
1018
        subfolder = self.create_folder(
1019
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1020
        objects = [subfolder]
1021
        append = objects.append
1022
        for i in range(11):
1023
            append(self.upload_object(self.container,
1024
                                      '%s/%d' % (folder, i),
1025
                                      depth='1')[0])
1026
        other = self.upload_object(self.container, strnextling(folder))[0]
1027

    
1028
        # move dir
1029
        copy_folder = self.create_folder(self.container)[0]
1030
        url = join_urls(self.pithos_path, self.user, self.container,
1031
                        copy_folder)
1032
        r = self.put('%s?delimiter=/' % url, data='',
1033
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
1034
        self.assertEqual(r.status_code, 201)
1035

    
1036
        for obj in objects:
1037
            # assert initial object does not exist
1038
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1039
            r = self.head(url)
1040
            self.assertEqual(r.status_code, 404)
1041

    
1042
            # assert new object was created
1043
            url = join_urls(self.pithos_path, self.user, self.container,
1044
                            obj.replace(folder, copy_folder))
1045
            r = self.head(url)
1046
            self.assertEqual(r.status_code, 200)
1047

    
1048
        # assert other has not been created under copy folder
1049
        url = join_urls(self.pithos_path, self.user, self.container,
1050
                        '%s/%s' % (copy_folder,
1051
                                   other.replace(folder, copy_folder)))
1052
        r = self.head(url)
1053
        self.assertEqual(r.status_code, 404)
1054

    
1055
    def test_move_from_other_account(self):
1056
        cname = 'c2'
1057
        self.create_container(cname, user='chuck')
1058
        self.create_container(cname, user='alice')
1059

    
1060
        # share object for read with alice
1061
        url = join_urls(self.pithos_path, self.user, self.container,
1062
                        self.object)
1063
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1064
                      HTTP_X_OBJECT_SHARING='read=alice')
1065
        self.assertEqual(r.status_code, 202)
1066

    
1067
        # assert not allowed move for chuck
1068
        oname = get_random_name()
1069
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
1070
        r = self.put(url, data='', user='chuck',
1071
                     HTTP_X_OBJECT_META_TEST='testcopy',
1072
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1073
                         self.container, self.object),
1074
                     HTTP_X_SOURCE_ACCOUNT='user')
1075

    
1076
        self.assertEqual(r.status_code, 403)
1077

    
1078
        # assert no new object was created
1079
        r = self.head(url, user='chuck')
1080
        self.assertEqual(r.status_code, 404)
1081

    
1082
        # assert not allowed move for alice
1083
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1084
        r = self.put(url, data='', user='alice',
1085
                     HTTP_X_OBJECT_META_TEST='testcopy',
1086
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1087
                         self.container, self.object),
1088
                     HTTP_X_SOURCE_ACCOUNT='user')
1089
        self.assertEqual(r.status_code, 403)
1090

    
1091
        # assert no new object was created
1092
        r = self.head(url, user='alice')
1093
        self.assertEqual(r.status_code, 404)
1094

    
1095
        # share object for write with dan
1096
        url = join_urls(self.pithos_path, self.user, self.container,
1097
                        self.object)
1098
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1099
                      HTTP_X_OBJECT_SHARING='write=dan')
1100
        self.assertEqual(r.status_code, 202)
1101

    
1102
        # assert not allowed move for alice
1103
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1104
        r = self.put(url, data='', user='alice',
1105
                     HTTP_X_OBJECT_META_TEST='testcopy',
1106
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1107
                         self.container, self.object),
1108
                     HTTP_X_SOURCE_ACCOUNT='user')
1109
        self.assertEqual(r.status_code, 403)
1110

    
1111
        # assert no new object was created
1112
        r = self.head(url, user='alice')
1113
        self.assertEqual(r.status_code, 404)
1114

    
1115
        # assert not allowed move for dan
1116
        self.create_container(cname, user='dan')
1117
        url = join_urls(self.pithos_path, 'dan', cname, oname)
1118
        r = self.put(url, data='', user='dan',
1119
                     HTTP_X_OBJECT_META_TEST='testcopy',
1120
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1121
                         self.container, self.object),
1122
                     HTTP_X_SOURCE_ACCOUNT='user')
1123
        self.assertEqual(r.status_code, 403)
1124

    
1125
        # assert no new object was created
1126
        r = self.head(url, user='dan')
1127
        self.assertEqual(r.status_code, 404)
1128

    
1129

    
1130
class ObjectCopy(PithosAPITest):
1131
    def setUp(self):
1132
        PithosAPITest.setUp(self)
1133
        self.container = 'c1'
1134
        self.create_container(self.container)
1135
        self.object, self.data = self.upload_object(self.container)[:-1]
1136

    
1137
        url = join_urls(
1138
            self.pithos_path, self.user, self.container, self.object)
1139
        r = self.head(url)
1140
        self.etag = r['X-Object-Hash']
1141

    
1142
    def test_copy(self):
1143
        with AssertMappingInvariant(self.get_object_info, self.container,
1144
                                    self.object):
1145
            oname = get_random_name()
1146
            # copy object
1147
            url = join_urls(self.pithos_path, self.user, self.container,
1148
                            self.object)
1149
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1150
                          HTTP_DESTINATION='/%s/%s' % (self.container,
1151
                                                       oname))
1152
            # assert copy success
1153
            url = join_urls(self.pithos_path, self.user, self.container,
1154
                            oname)
1155
            self.assertEqual(r.status_code, 201)
1156

    
1157
            # assert access the new object
1158
            r = self.head(url)
1159
            self.assertEqual(r.status_code, 200)
1160
            self.assertTrue('X-Object-Meta-Test' in r)
1161
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1162

    
1163
            # assert etag is the same
1164
            self.assertTrue('X-Object-Hash' in r)
1165
            self.assertEqual(r['X-Object-Hash'], self.etag)
1166

    
1167
            # assert source object still exists
1168
            url = join_urls(self.pithos_path, self.user, self.container,
1169
                            self.object)
1170
            r = self.head(url)
1171
            self.assertEqual(r.status_code, 200)
1172

    
1173
            # assert etag is the same
1174
            self.assertTrue('X-Object-Hash' in r)
1175
            self.assertEqual(r['X-Object-Hash'], self.etag)
1176

    
1177
            r = self.get(url)
1178
            self.assertEqual(r.status_code, 200)
1179

    
1180
            # assert etag is the same
1181
            self.assertTrue('X-Object-Hash' in r)
1182
            self.assertEqual(r['X-Object-Hash'], self.etag)
1183

    
1184
            # copy object to other container (not existing)
1185
            cname = get_random_name()
1186
            url = join_urls(self.pithos_path, self.user, self.container,
1187
                            self.object)
1188
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1189
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1190

    
1191
            # assert destination container does not exist
1192
            url = join_urls(self.pithos_path, self.user, cname,
1193
                            self.object)
1194
            self.assertEqual(r.status_code, 404)
1195

    
1196
            # create container
1197
            self.create_container(cname)
1198

    
1199
            # copy object to other container (existing)
1200
            url = join_urls(self.pithos_path, self.user, self.container,
1201
                            self.object)
1202
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1203
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1204

    
1205
            # assert copy success
1206
            url = join_urls(self.pithos_path, self.user, cname,
1207
                            self.object)
1208
            self.assertEqual(r.status_code, 201)
1209

    
1210
            # assert access the new object
1211
            r = self.head(url)
1212
            self.assertEqual(r.status_code, 200)
1213
            self.assertTrue('X-Object-Meta-Test' in r)
1214
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1215

    
1216
            # assert etag is the same
1217
            self.assertTrue('X-Object-Hash' in r)
1218
            self.assertEqual(r['X-Object-Hash'], self.etag)
1219

    
1220
            # assert source object still exists
1221
            url = join_urls(self.pithos_path, self.user, self.container,
1222
                            self.object)
1223
            r = self.head(url)
1224
            self.assertEqual(r.status_code, 200)
1225

    
1226
            # assert etag is the same
1227
            self.assertTrue('X-Object-Hash' in r)
1228
            self.assertEqual(r['X-Object-Hash'], self.etag)
1229

    
1230
            r = self.get(url)
1231
            self.assertEqual(r.status_code, 200)
1232

    
1233
            # assert etag is the same
1234
            self.assertTrue('X-Object-Hash' in r)
1235
            self.assertEqual(r['X-Object-Hash'], self.etag)
1236

    
1237
    @pithos_test_settings(API_LIST_LIMIT=10)
1238
    def test_copy_dir_contents(self):
1239
        folder = self.create_folder(self.container)[0]
1240
        subfolder = self.create_folder(
1241
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1242
        objects = [subfolder]
1243
        append = objects.append
1244
        for i in range(11):
1245
            append(self.upload_object(self.container,
1246
                                      '%s/%d' % (folder, i),
1247
                                      depth='1')[0])
1248
        other = self.upload_object(self.container, strnextling(folder))[0]
1249

    
1250
        # copy dir
1251
        url = join_urls(self.pithos_path, self.user, self.container,
1252
                        folder)
1253
        copy_folder = self.create_folder(self.container)[0]
1254
        r = self.copy('%s?delimiter=/' % url,
1255
                      HTTP_X_OBJECT_META_TEST='testcopy',
1256
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1257
                                                   copy_folder))
1258
        self.assertEqual(r.status_code, 201)
1259

    
1260
        for obj in objects:
1261
            # assert object exists
1262
            url = join_urls(self.pithos_path, self.user, self.container,
1263
                            obj.replace(folder, copy_folder))
1264
            r = self.head(url)
1265
            self.assertEqual(r.status_code, 200)
1266

    
1267
        # assert other has not been created under copy folder
1268
        url = join_urls(self.pithos_path, self.user, self.container,
1269
                        '%s/%s' % (copy_folder,
1270
                                   other.replace(folder, copy_folder)))
1271
        r = self.head(url)
1272
        self.assertEqual(r.status_code, 404)
1273

    
1274
    def test_copy_to_other_account(self):
1275
        # create a container under alice account
1276
        cname = self.create_container(user='alice')[0]
1277

    
1278
        # create a folder under this container
1279
        folder = self.create_folder(cname, user='alice')[0]
1280

    
1281
        oname = get_random_name()
1282

    
1283
        # copy object to other account container
1284
        url = join_urls(self.pithos_path, self.user, self.container,
1285
                        self.object)
1286
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1287
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1288
                      HTTP_DESTINATION_ACCOUNT='alice')
1289
        self.assertEqual(r.status_code, 403)
1290

    
1291
        # share object for read with user
1292
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1293
        r = self.post(url, user='alice', content_type='',
1294
                      HTTP_CONTENT_RANGE='bytes */*',
1295
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1296
        self.assertEqual(r.status_code, 202)
1297

    
1298
        # assert copy object still is not allowed
1299
        url = join_urls(self.pithos_path, self.user, self.container,
1300
                        self.object)
1301
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1302
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1303
                      HTTP_DESTINATION_ACCOUNT='alice')
1304
        self.assertEqual(r.status_code, 403)
1305

    
1306
        # share object for write with user
1307
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1308
        r = self.post(url, user='alice',  content_type='',
1309
                      HTTP_CONTENT_RANGE='bytes */*',
1310
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1311
        self.assertEqual(r.status_code, 202)
1312

    
1313
        # assert copy object now is allowed
1314
        url = join_urls(self.pithos_path, self.user, self.container,
1315
                        self.object)
1316
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1317
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1318
                      HTTP_DESTINATION_ACCOUNT='alice')
1319
        self.assertEqual(r.status_code, 201)
1320

    
1321
        # assert access the new object
1322
        url = join_urls(self.pithos_path, 'alice', cname, folder, oname)
1323
        r = self.head(url, user='alice')
1324
        self.assertEqual(r.status_code, 200)
1325
        self.assertTrue('X-Object-Meta-Test' in r)
1326
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1327

    
1328
        # assert etag is the same
1329
        self.assertTrue('X-Object-Hash' in r)
1330
        self.assertEqual(r['X-Object-Hash'], self.etag)
1331

    
1332
        # assert source object still exists
1333
        url = join_urls(self.pithos_path, self.user, self.container,
1334
                        self.object)
1335
        r = self.head(url)
1336
        self.assertEqual(r.status_code, 200)
1337

    
1338
        # assert etag is the same
1339
        self.assertTrue('X-Object-Hash' in r)
1340
        self.assertEqual(r['X-Object-Hash'], self.etag)
1341

    
1342
        r = self.get(url)
1343
        self.assertEqual(r.status_code, 200)
1344

    
1345
        # assert etag is the same
1346
        self.assertTrue('X-Object-Hash' in r)
1347
        self.assertEqual(r['X-Object-Hash'], self.etag)
1348

    
1349

    
1350
class ObjectMove(PithosAPITest):
1351
    def setUp(self):
1352
        PithosAPITest.setUp(self)
1353
        self.container = 'c1'
1354
        self.create_container(self.container)
1355
        self.object, self.data = self.upload_object(self.container)[:-1]
1356

    
1357
        url = join_urls(
1358
            self.pithos_path, self.user, self.container, self.object)
1359
        r = self.head(url)
1360
        self.etag = r['X-Object-Hash']
1361

    
1362
    def test_move(self):
1363
        oname = get_random_name()
1364

    
1365
        # move object
1366
        url = join_urls(self.pithos_path, self.user, self.container,
1367
                        self.object)
1368
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1369
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1370
                                                   oname))
1371
        # assert move success
1372
        url = join_urls(self.pithos_path, self.user, self.container,
1373
                        oname)
1374
        self.assertEqual(r.status_code, 201)
1375

    
1376
        # assert access the new object
1377
        r = self.head(url)
1378
        self.assertEqual(r.status_code, 200)
1379
        self.assertTrue('X-Object-Meta-Test' in r)
1380
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1381

    
1382
        # assert etag is the same
1383
        self.assertTrue('X-Object-Hash' in r)
1384
        self.assertEqual(r['X-Object-Hash'], self.etag)
1385

    
1386
        # assert source object does not exist
1387
        url = join_urls(self.pithos_path, self.user, self.container,
1388
                        self.object)
1389
        r = self.head(url)
1390
        self.assertEqual(r.status_code, 404)
1391

    
1392
    @pithos_test_settings(API_LIST_LIMIT=10)
1393
    def test_move_dir_contents(self):
1394
        folder = self.create_folder(self.container)[0]
1395
        subfolder = self.create_folder(
1396
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1397
        objects = [subfolder]
1398
        append = objects.append
1399
        for i in range(11):
1400
            append(self.upload_object(self.container,
1401
                                      '%s/%d' % (folder, i),
1402
                                      depth='1')[0])
1403
        other = self.upload_object(self.container, strnextling(folder))[0]
1404

    
1405
        # copy dir
1406
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1407
        copy_folder = self.create_folder(self.container)[0]
1408
        r = self.move('%s?delimiter=/' % url,
1409
                      HTTP_X_OBJECT_META_TEST='testcopy',
1410
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1411
                                                   copy_folder))
1412
        self.assertEqual(r.status_code, 201)
1413

    
1414
        for obj in objects:
1415
            # assert object exists
1416
            url = join_urls(self.pithos_path, self.user, self.container,
1417
                            obj.replace(folder, copy_folder))
1418
            r = self.head(url)
1419
            self.assertEqual(r.status_code, 200)
1420

    
1421
        # assert other has not been created under copy folder
1422
        url = join_urls(self.pithos_path, self.user, self.container,
1423
                        '%s/%s' % (copy_folder,
1424
                                   other.replace(folder, copy_folder)))
1425
        r = self.head(url)
1426
        self.assertEqual(r.status_code, 404)
1427

    
1428
    def test_move_to_other_container(self):
1429
        # move object to other container (not existing)
1430
        cname = get_random_name()
1431
        url = join_urls(self.pithos_path, self.user, self.container,
1432
                        self.object)
1433
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1434
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1435

    
1436
        # assert destination container does not exist
1437
        url = join_urls(self.pithos_path, self.user, cname,
1438
                        self.object)
1439
        self.assertEqual(r.status_code, 404)
1440

    
1441
        # create container
1442
        self.create_container(cname)
1443

    
1444
        # move object to other container (existing)
1445
        url = join_urls(self.pithos_path, self.user, self.container,
1446
                        self.object)
1447
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1448
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1449

    
1450
        # assert move success
1451
        url = join_urls(self.pithos_path, self.user, cname,
1452
                        self.object)
1453
        self.assertEqual(r.status_code, 201)
1454

    
1455
        # assert access the new object
1456
        r = self.head(url)
1457
        self.assertEqual(r.status_code, 200)
1458
        self.assertTrue('X-Object-Meta-Test' in r)
1459
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1460

    
1461
        # assert etag is the same
1462
        self.assertTrue('X-Object-Hash' in r)
1463
        self.assertEqual(r['X-Object-Hash'], self.etag)
1464

    
1465
        # assert source object does not exist
1466
        url = join_urls(self.pithos_path, self.user, self.container,
1467
                        self.object)
1468
        r = self.head(url)
1469
        self.assertEqual(r.status_code, 404)
1470

    
1471
    def test_move_to_other_account(self):
1472
        # create a container under alice account
1473
        cname = self.create_container(user='alice')[0]
1474

    
1475
        # create a folder under this container
1476
        folder = self.create_folder(cname, user='alice')[0]
1477

    
1478
        oname = get_random_name()
1479

    
1480
        # move object to other account container
1481
        url = join_urls(self.pithos_path, self.user, self.container,
1482
                        self.object)
1483
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1484
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1485
                      HTTP_DESTINATION_ACCOUNT='alice')
1486
        self.assertEqual(r.status_code, 403)
1487

    
1488
        # share object for read with user
1489
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1490
        r = self.post(url, user='alice', content_type='',
1491
                      HTTP_CONTENT_RANGE='bytes */*',
1492
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1493
        self.assertEqual(r.status_code, 202)
1494

    
1495
        # assert move object still is not allowed
1496
        url = join_urls(self.pithos_path, self.user, self.container,
1497
                        self.object)
1498
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1499
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1500
                      HTTP_DESTINATION_ACCOUNT='alice')
1501
        self.assertEqual(r.status_code, 403)
1502

    
1503
        # share object for write with user
1504
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1505
        r = self.post(url, user='alice',  content_type='',
1506
                      HTTP_CONTENT_RANGE='bytes */*',
1507
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1508
        self.assertEqual(r.status_code, 202)
1509

    
1510
        # assert move object now is allowed
1511
        url = join_urls(self.pithos_path, self.user, self.container,
1512
                        self.object)
1513
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1514
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1515
                      HTTP_DESTINATION_ACCOUNT='alice')
1516
        self.assertEqual(r.status_code, 201)
1517

    
1518
        # assert source object does not exist
1519
        url = join_urls(self.pithos_path, self.user, self.container,
1520
                        self.object)
1521
        r = self.head(url)
1522
        self.assertEqual(r.status_code, 404)
1523

    
1524

    
1525
class ObjectPost(PithosAPITest):
1526
    def setUp(self):
1527
        PithosAPITest.setUp(self)
1528
        self.container = 'c1'
1529
        self.create_container(self.container)
1530
        self.object, self.object_data = self.upload_object(self.container)[:2]
1531

    
1532
    def test_update_meta(self):
1533
        with AssertUUidInvariant(self.get_object_info,
1534
                                 self.container,
1535
                                 self.object):
1536
            # update metadata
1537
            d = {'a' * 114: 'b' * 256}
1538
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1539
                          k, v in d.items())
1540
            url = join_urls(self.pithos_path, self.user, self.container,
1541
                            self.object)
1542
            r = self.post(url, content_type='', **kwargs)
1543
            self.assertEqual(r.status_code, 202)
1544

    
1545
            # assert metadata have been updated
1546
            meta = self.get_object_meta(self.container, self.object)
1547

    
1548
            for k, v in d.items():
1549
                self.assertTrue(k.title() in meta)
1550
                self.assertTrue(meta[k.title()], v)
1551

    
1552
            # Header key too large
1553
            d = {'a' * 115: 'b' * 256}
1554
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1555
                          k, v in d.items())
1556
            r = self.post(url, content_type='', **kwargs)
1557
            self.assertEqual(r.status_code, 400)
1558

    
1559
            # Header value too large
1560
            d = {'a' * 114: 'b' * 257}
1561
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
1562
                          k, v in d.items())
1563
            r = self.post(url, content_type='', **kwargs)
1564
            self.assertEqual(r.status_code, 400)
1565

    
1566
#            # Check utf-8 meta
1567
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
1568
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1569
#                          k, v in d.items())
1570
#            url = join_urls(self.pithos_path, self.user, self.container,
1571
#                            self.object)
1572
#            r = self.post(url, content_type='', **kwargs)
1573
#            self.assertEqual(r.status_code, 202)
1574
#
1575
#            # assert metadata have been updated
1576
#            meta = self.get_object_meta(self.container, self.object)
1577
#
1578
#            for k, v in d.items():
1579
#                key = 'X-Object-Meta-%s' % k.title()
1580
#                self.assertTrue(key in meta)
1581
#                self.assertTrue(meta[key], v)
1582
#
1583
#            # Header key too large
1584
#            d = {'α' * 114: 'β' * (256 / 2)}
1585
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1586
#                          k, v in d.items())
1587
#            r = self.post(url, content_type='', **kwargs)
1588
#            self.assertEqual(r.status_code, 400)
1589
#
1590
#            # Header value too large
1591
#            d = {'α' * 114: 'β' * 256}
1592
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
1593
#                          k, v in d.items())
1594
#            r = self.udpate(url, content_type='', **kwargs)
1595
#            self.assertEqual(r.status_code, 400)
1596

    
1597
    def test_update_object(self):
1598
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1599
        oname, odata = self.upload_object(
1600
            self.container, length=random.randint(
1601
                block_size + 2, 2 * block_size))[:2]
1602

    
1603
        length = len(odata)
1604
        first_byte_pos = random.randint(1, block_size)
1605
        last_byte_pos = random.randint(block_size + 1, length - 1)
1606
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1607
        kwargs = {'content_type': 'application/octet-stream',
1608
                  'HTTP_CONTENT_RANGE': range}
1609

    
1610
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1611
        partial = last_byte_pos - first_byte_pos + 1
1612
        data = get_random_data(partial)
1613
        r = self.post(url, data=data, **kwargs)
1614

    
1615
        self.assertEqual(r.status_code, 204)
1616
        self.assertTrue('ETag' in r)
1617
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1618
                                     data)
1619
        if pithos_settings.UPDATE_MD5:
1620
            etag = md5_hash(updated_data)
1621
        else:
1622
            etag = merkle(updated_data)
1623
        #self.assertEqual(r['ETag'], etag)
1624

    
1625
        # check modified object
1626
        r = self.get(url)
1627

    
1628
        self.assertEqual(r.status_code, 200)
1629
        self.assertEqual(r.content, updated_data)
1630
        self.assertEqual(etag, r['ETag'])
1631

    
1632
    def test_update_object_divided_by_blocksize(self):
1633
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1634
        oname, odata = self.upload_object(self.container,
1635
                                          length=2 * block_size)[:2]
1636

    
1637
        length = len(odata)
1638
        first_byte_pos = block_size
1639
        last_byte_pos = 2 * block_size - 1
1640
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1641
        kwargs = {'content_type': 'application/octet-stream',
1642
                  'HTTP_CONTENT_RANGE': range}
1643

    
1644
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1645
        partial = last_byte_pos - first_byte_pos + 1
1646
        data = get_random_data(partial)
1647
        r = self.post(url, data=data, **kwargs)
1648

    
1649
        self.assertEqual(r.status_code, 204)
1650
        self.assertTrue('ETag' in r)
1651
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1652
                                     data)
1653
        if pithos_settings.UPDATE_MD5:
1654
            etag = md5_hash(updated_data)
1655
        else:
1656
            etag = merkle(updated_data)
1657
        #self.assertEqual(r['ETag'], etag)
1658

    
1659
        # check modified object
1660
        r = self.get(url)
1661

    
1662
        self.assertEqual(r.status_code, 200)
1663
        self.assertEqual(r.content, updated_data)
1664
        self.assertEqual(etag, r['ETag'])
1665

    
1666
    def test_update_object_invalid_content_length(self):
1667
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1668
        oname, odata = self.upload_object(
1669
            self.container, length=random.randint(
1670
                block_size + 1, 2 * block_size))[:2]
1671

    
1672
        length = len(odata)
1673
        first_byte_pos = random.randint(1, block_size)
1674
        last_byte_pos = random.randint(block_size + 1, length - 1)
1675
        partial = last_byte_pos - first_byte_pos + 1
1676
        data = get_random_data(partial)
1677
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1678
        kwargs = {'content_type': 'application/octet-stream',
1679
                  'HTTP_CONTENT_RANGE': range,
1680
                  'CONTENT_LENGTH': str(partial + 1)}
1681

    
1682
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1683
        r = self.post(url, data=data, **kwargs)
1684

    
1685
        self.assertEqual(r.status_code, 400)
1686

    
1687
    def test_update_object_invalid_range(self):
1688
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1689
        oname, odata = self.upload_object(
1690
            self.container, length=random.randint(block_size + 1,
1691
                                                  2 * block_size))[:2]
1692

    
1693
        length = len(odata)
1694
        first_byte_pos = random.randint(1, block_size)
1695
        last_byte_pos = first_byte_pos - 1
1696
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1697
        kwargs = {'content_type': 'application/octet-stream',
1698
                  'HTTP_CONTENT_RANGE': range}
1699

    
1700
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1701
        r = self.post(url, data=get_random_data(), **kwargs)
1702

    
1703
        self.assertEqual(r.status_code, 416)
1704

    
1705
    def test_update_object_out_of_limits(self):
1706
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1707
        oname, odata = self.upload_object(
1708
            self.container, length=random.randint(block_size + 1,
1709
                                                  2 * block_size))[:2]
1710

    
1711
        length = len(odata)
1712
        first_byte_pos = random.randint(1, block_size)
1713
        last_byte_pos = length + 1
1714
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1715
        kwargs = {'content_type': 'application/octet-stream',
1716
                  'HTTP_CONTENT_RANGE': range}
1717

    
1718
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1719
        r = self.post(url, data=get_random_data(), **kwargs)
1720

    
1721
        self.assertEqual(r.status_code, 416)
1722

    
1723
    def test_append(self):
1724
        data = get_random_data()
1725
        length = len(data)
1726
        url = join_urls(self.pithos_path, self.user, self.container,
1727
                        self.object)
1728
        r = self.post(url, data=data, content_type='application/octet-stream',
1729
                      HTTP_CONTENT_LENGTH=str(length),
1730
                      HTTP_CONTENT_RANGE='bytes */*')
1731
        self.assertEqual(r.status_code, 204)
1732

    
1733
        r = self.get(url)
1734
        content = r.content
1735
        self.assertEqual(len(content), len(self.object_data) + length)
1736
        self.assertEqual(content, self.object_data + data)
1737

    
1738
    # TODO Fix the test
1739
    def _test_update_with_chunked_transfer(self):
1740
        data = get_random_data()
1741
        length = len(data)
1742

    
1743
        url = join_urls(self.pithos_path, self.user, self.container,
1744
                        self.object)
1745
        r = self.post(url, data=data, content_type='application/octet-stream',
1746
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1747
                      HTTP_TRANSFER_ENCODING='chunked')
1748
        self.assertEqual(r.status_code, 204)
1749

    
1750
        # check modified object
1751
        r = self.get(url)
1752
        content = r.content
1753
        self.assertEqual(content[0:length], data)
1754
        self.assertEqual(content[length:], self.object_data[length:])
1755

    
1756
    def test_update_from_other_object(self):
1757
        src = self.object
1758
        dest = get_random_name()
1759

    
1760
        url = join_urls(self.pithos_path, self.user, self.container, src)
1761
        r = self.get(url)
1762
        source_data = r.content
1763
        source_meta = self.get_object_info(self.container, src)
1764

    
1765
        # update zero length object
1766
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1767
        r = self.put(url, data='')
1768
        self.assertEqual(r.status_code, 201)
1769

    
1770
        r = self.post(url,
1771
                      HTTP_CONTENT_RANGE='bytes */*',
1772
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1773
        self.assertEqual(r.status_code, 204)
1774

    
1775
        r = self.get(url)
1776
        dest_data = r.content
1777
        dest_meta = self.get_object_info(self.container, dest)
1778

    
1779
        self.assertEqual(source_data, dest_data)
1780
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1781
        self.assertEqual(source_meta['X-Object-Hash'],
1782
                         dest_meta['X-Object-Hash'])
1783
        self.assertTrue(
1784
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1785

    
1786
    def test_update_range_from_other_object(self):
1787
        src = self.object
1788
        dest = get_random_name()
1789

    
1790
        url = join_urls(self.pithos_path, self.user, self.container, src)
1791
        r = self.get(url)
1792
        source_data = r.content
1793
        source_length = len(source_data)
1794

    
1795
        # update zero length object
1796
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1797
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1798
        initial_data = get_random_data(
1799
            length=source_length + random.randint(0, block_size))
1800

    
1801
        r = self.put(url, data=initial_data)
1802
        self.assertEqual(r.status_code, 201)
1803

    
1804
        offset = random.randint(0, source_length - 1)
1805
        upto = random.randint(offset, source_length - 1)
1806
        r = self.post(url,
1807
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1808
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1809
        self.assertEqual(r.status_code, 204)
1810

    
1811
        r = self.get(url)
1812
        content = r.content
1813
        self.assertEqual(content, (initial_data[:offset] +
1814
                                   source_data[:upto - offset + 1] +
1815
                                   initial_data[upto + 1:]))
1816

    
1817
    def test_update_range_from_invalid_other_object(self):
1818
        src = self.object
1819
        dest = get_random_name()
1820

    
1821
        url = join_urls(self.pithos_path, self.user, self.container, src)
1822
        r = self.get(url)
1823

    
1824
        # update zero length object
1825
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1826
        initial_data = get_random_data()
1827
        length = len(initial_data)
1828
        r = self.put(url, data=initial_data)
1829
        self.assertEqual(r.status_code, 201)
1830

    
1831
        offset = random.randint(1, length - 2)
1832
        upto = random.randint(offset, length - 1)
1833

    
1834
        # source object does not start with /
1835
        r = self.post(url,
1836
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1837
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1838
        self.assertEqual(r.status_code, 400)
1839

    
1840
        # source object does not exist
1841
        r = self.post(url,
1842
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1843
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1844
        self.assertEqual(r.status_code, 404)
1845

    
1846
    def test_restore_version(self):
1847
        info = self.get_object_info(self.container, self.object)
1848
        v = []
1849
        append = v.append
1850
        append((info['X-Object-Version'],
1851
                int(info['Content-Length']),
1852
                self.object_data))
1853

    
1854
        # update object
1855
        data, r = self.upload_object(self.container, self.object,
1856
                                     length=v[0][1] - 1)[1:]
1857
        self.assertTrue('X-Object-Version' in r)
1858
        append((r['X-Object-Version'], len(data), data))
1859
        # v[0][1] > v[1][1]
1860

    
1861
        # update with the previous version
1862
        url = join_urls(self.pithos_path, self.user, self.container,
1863
                        self.object)
1864
        r = self.post(url,
1865
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1866
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1867
                                                       self.object),
1868
                      HTTP_X_SOURCE_VERSION=v[0][0],
1869
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1870
        self.assertEqual(r.status_code, 204)
1871
        # v[2][1] = v[0][1] > v[1][1]
1872

    
1873
        # check content
1874
        r = self.get(url)
1875
        content = r.content
1876
        self.assertEqual(len(content), v[0][1])
1877
        self.assertEqual(content, self.object_data)
1878
        append((r['X-Object-Version'], len(content), content))
1879

    
1880
        # update object content(v4) > content(v2)
1881
        data, r = self.upload_object(self.container, self.object,
1882
                                     length=v[2][1] + 1)[1:]
1883
        self.assertTrue('X-Object-Version' in r)
1884
        append((r['X-Object-Version'], len(data), data))
1885
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1886

    
1887
        # update with the previous version
1888
        r = self.post(url,
1889
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1890
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1891
                                                       self.object),
1892
                      HTTP_X_SOURCE_VERSION=v[2][0],
1893
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1894
        self.assertEqual(r.status_code, 204)
1895
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1896

    
1897
        # check content
1898
        r = self.get(url)
1899
        data = r.content
1900
        self.assertEqual(data, v[2][2])
1901
        append((r['X-Object-Version'], len(data), data))
1902

    
1903
    def test_update_from_other_version(self):
1904
        versions = []
1905
        info = self.get_object_info(self.container, self.object)
1906
        versions.append(info['X-Object-Version'])
1907
        pre_length = int(info['Content-Length'])
1908

    
1909
        # update object
1910
        d1, r = self.upload_object(self.container, self.object,
1911
                                   length=pre_length - 1)[1:]
1912
        self.assertTrue('X-Object-Version' in r)
1913
        versions.append(r['X-Object-Version'])
1914

    
1915
        # update object
1916
        d2, r = self.upload_object(self.container, self.object,
1917
                                   length=pre_length - 2)[1:]
1918
        self.assertTrue('X-Object-Version' in r)
1919
        versions.append(r['X-Object-Version'])
1920

    
1921
        # get previous version
1922
        url = join_urls(self.pithos_path, self.user, self.container,
1923
                        self.object)
1924
        r = self.get('%s?version=list&format=json' % url)
1925
        self.assertEqual(r.status_code, 200)
1926
        l = json.loads(r.content)['versions']
1927
        self.assertEqual(len(l), 3)
1928
        self.assertEqual([str(v[0]) for v in l], versions)
1929

    
1930
        # update with the previous version
1931
        r = self.post(url,
1932
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1933
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1934
                                                       self.object),
1935
                      HTTP_X_SOURCE_VERSION=versions[0])
1936
        self.assertEqual(r.status_code, 204)
1937

    
1938
        # check content
1939
        r = self.get(url)
1940
        content = r.content
1941
        self.assertEqual(len(content), pre_length)
1942
        self.assertEqual(content, self.object_data)
1943

    
1944
        # update object
1945
        d3, r = self.upload_object(self.container, self.object,
1946
                                   length=len(d2) + 1)[1:]
1947
        self.assertTrue('X-Object-Version' in r)
1948
        versions.append(r['X-Object-Version'])
1949

    
1950
        # update with the previous version
1951
        r = self.post(url,
1952
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1953
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1954
                                                       self.object),
1955
                      HTTP_X_SOURCE_VERSION=versions[-2])
1956
        self.assertEqual(r.status_code, 204)
1957

    
1958
        # check content
1959
        r = self.get(url)
1960
        content = r.content
1961
        self.assertEqual(content, d2 + d3[-1])
1962

    
1963
    def test_update_invalid_permissions(self):
1964
        url = join_urls(self.pithos_path, self.user, self.container,
1965
                        self.object)
1966
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1967
                      HTTP_X_OBJECT_SHARING='%s' % (257*'a'))
1968
        self.assertEqual(r.status_code, 400)
1969

    
1970
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1971
                      HTTP_X_OBJECT_SHARING='read=%s' % (257*'a'))
1972
        self.assertEqual(r.status_code, 400)
1973

    
1974
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1975
                      HTTP_X_OBJECT_SHARING='write=%s' % (257*'a'))
1976
        self.assertEqual(r.status_code, 400)
1977

    
1978

    
1979
class ObjectDelete(PithosAPITest):
1980
    def setUp(self):
1981
        PithosAPITest.setUp(self)
1982
        self.container = 'c1'
1983
        self.create_container(self.container)
1984
        self.object, self.object_data = self.upload_object(self.container)[:2]
1985

    
1986
    def test_delete(self):
1987
        url = join_urls(self.pithos_path, self.user, self.container,
1988
                        self.object)
1989
        r = self.delete(url)
1990
        self.assertEqual(r.status_code, 204)
1991

    
1992
        r = self.head(url)
1993
        self.assertEqual(r.status_code, 404)
1994

    
1995
    def test_delete_non_existent(self):
1996
        url = join_urls(self.pithos_path, self.user, self.container,
1997
                        get_random_name())
1998
        r = self.delete(url)
1999
        self.assertEqual(r.status_code, 404)
2000

    
2001
    @pithos_test_settings(API_LIST_LIMIT=10)
2002
    def test_delete_dir(self):
2003
        folder = self.create_folder(self.container)[0]
2004
        subfolder = self.create_folder(
2005
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
2006
        objects = [subfolder]
2007
        append = objects.append
2008
        for i in range(11):
2009
            append(self.upload_object(self.container,
2010
                                      '%s/%d' % (folder, i),
2011
                                      depth='1')[0])
2012
        other = self.upload_object(self.container, strnextling(folder))[0]
2013

    
2014
        # move dir
2015
        url = join_urls(self.pithos_path, self.user, self.container, folder)
2016
        r = self.delete('%s?delimiter=/' % url)
2017
        self.assertEqual(r.status_code, 204)
2018

    
2019
        for obj in objects:
2020
            # assert object does not exist
2021
            url = join_urls(self.pithos_path, self.user, self.container, obj)
2022
            r = self.head(url)
2023
            self.assertEqual(r.status_code, 404)
2024

    
2025
        # assert other has not been deleted
2026
        url = join_urls(self.pithos_path, self.user, self.container, other)
2027
        r = self.head(url)
2028
        self.assertEqual(r.status_code, 200)