Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ e4c7cbeb

History | View | Annotate | Download (49.3 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from collections import defaultdict
38
from urllib import quote
39
from functools import partial
40

    
41
from pithos.api.test import (PithosAPITest, pithos_settings,
42
                             AssertMappingInvariant, AssertUUidInvariant,
43
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44
                             DATE_FORMATS)
45
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46
                                  get_random_data, get_random_name)
47

    
48
from synnefo.lib import join_urls
49

    
50
import django.utils.simplejson as json
51

    
52
import random
53
import re
54
import datetime
55
import time as _time
56

    
57
merkle = partial(merkle,
58
                 blocksize=TEST_BLOCK_SIZE,
59
                 blockhash=TEST_HASH_ALGORITHM)
60

    
61

    
62
class ObjectHead(PithosAPITest):
63
    def setUp(self):
64
        cname = self.create_container()[0]
65
        oname, odata = self.upload_object(cname)[:-1]
66

    
67
        url = join_urls(self.pithos_path, cname, oname)
68
        r = self.head(url)
69
        map(lambda i: self.assertTrue(i in r),
70
            ['Etag',
71
             'Content-Length',
72
             'Content-Type',
73
             'Last-Modified',
74
             'Content-Encoding',
75
             'Content-Disposition',
76
             'X-Object-Hash',
77
             'X-Object-UUID',
78
             'X-Object-Version',
79
             'X-Object-Version-Timestamp',
80
             'X-Object-Modified-By',
81
             'X-Object-Manifest'])
82

    
83

    
84
class ObjectGet(PithosAPITest):
85
    def setUp(self):
86
        PithosAPITest.setUp(self)
87
        self.containers = ['c1', 'c2']
88

    
89
        # create some containers
90
        for c in self.containers:
91
            self.create_container(c)
92

    
93
        # upload files
94
        self.objects = defaultdict(list)
95
        self.objects['c1'].append(self.upload_object('c1')[0])
96

    
97
    def test_versions(self):
98
        c = 'c1'
99
        o = self.objects[c][0]
100
        url = join_urls(self.pithos_path, self.user, c, o)
101

    
102
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
103
        r = self.post(url, content_type='', **meta)
104
        self.assertEqual(r.status_code, 202)
105

    
106
        url = join_urls(self.pithos_path, self.user, c, o)
107
        r = self.get('%s?version=list&format=json' % url)
108
        self.assertEqual(r.status_code, 200)
109
        l1 = json.loads(r.content)['versions']
110
        self.assertEqual(len(l1), 2)
111

    
112
        # update meta
113
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
114
                'HTTP_X_OBJECT_META_STOCK': 'True'}
115
        r = self.post(url, content_type='', **meta)
116
        self.assertEqual(r.status_code, 202)
117

    
118
        # assert a newly created version has been created
119
        r = self.get('%s?version=list&format=json' % url)
120
        self.assertEqual(r.status_code, 200)
121
        l2 = json.loads(r.content)['versions']
122
        self.assertEqual(len(l2), len(l1) + 1)
123
        self.assertEqual(l2[:-1], l1)
124

    
125
        vserial, _ = l2[-2]
126
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
127
                         {'X-Object-Meta-Quality': 'AAA'})
128

    
129
        # update data
130
        self.append_object_data(c, o)
131

    
132
        # assert a newly created version has been created
133
        r = self.get('%s?version=list&format=json' % url)
134
        self.assertEqual(r.status_code, 200)
135
        l3 = json.loads(r.content)['versions']
136
        self.assertEqual(len(l3), len(l2) + 1)
137
        self.assertEqual(l3[:-1], l2)
138

    
139
    def test_objects_with_trailing_spaces(self):
140
        # create object
141
        oname = self.upload_object('c1')[0]
142
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
143

    
144
        r = self.get(quote('%s ' % url))
145
        self.assertEqual(r.status_code, 404)
146

    
147
        # delete object
148
        self.delete(url)
149

    
150
        r = self.get(url)
151
        self.assertEqual(r.status_code, 404)
152

    
153
        # upload object with trailing space
154
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
155

    
156
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
157
        r = self.get(url)
158
        self.assertEqual(r.status_code, 200)
159

    
160
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
161
        r = self.get(url)
162
        self.assertEqual(r.status_code, 404)
163

    
164
    def test_get_partial(self):
165
        cname = self.containers[0]
166
        oname, odata = self.upload_object(cname, length=512)[:-1]
167
        url = join_urls(self.pithos_path, self.user, cname, oname)
168
        r = self.get(url, HTTP_RANGE='bytes=0-499')
169
        self.assertEqual(r.status_code, 206)
170
        data = r.content
171
        self.assertEqual(data, odata[:500])
172
        self.assertTrue('Content-Range' in r)
173
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
174
        self.assertTrue('Content-Type' in r)
175
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
176

    
177
    def test_get_final_500(self):
178
        cname = self.containers[0]
179
        oname, odata = self.upload_object(cname, length=512)[:-1]
180
        size = len(odata)
181
        url = join_urls(self.pithos_path, self.user, cname, oname)
182
        r = self.get(url, HTTP_RANGE='bytes=-500')
183
        self.assertEqual(r.status_code, 206)
184
        self.assertEqual(r.content, odata[-500:])
185
        self.assertTrue('Content-Range' in r)
186
        self.assertEqual(r['Content-Range'],
187
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
188
        self.assertTrue('Content-Type' in r)
189
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
190

    
191
    def test_get_rest(self):
192
        cname = self.containers[0]
193
        oname, odata = self.upload_object(cname, length=512)[:-1]
194
        size = len(odata)
195
        url = join_urls(self.pithos_path, self.user, cname, oname)
196
        offset = len(odata) - random.randint(1, 512)
197
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
198
        self.assertEqual(r.status_code, 206)
199
        self.assertEqual(r.content, odata[offset:])
200
        self.assertTrue('Content-Range' in r)
201
        self.assertEqual(r['Content-Range'],
202
                         'bytes %s-%s/%s' % (offset, size - 1, size))
203
        self.assertTrue('Content-Type' in r)
204
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
205

    
206
    def test_get_range_not_satisfiable(self):
207
        cname = self.containers[0]
208
        oname, odata = self.upload_object(cname, length=512)[:-1]
209
        url = join_urls(self.pithos_path, self.user, cname, oname)
210

    
211
        # TODO
212
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
213
        #self.assertEqual(r.status_code, 416)
214

    
215
        offset = len(odata) + 1
216
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
217
        self.assertEqual(r.status_code, 416)
218

    
219
    def test_multiple_range(self):
220
        cname = self.containers[0]
221
        oname, odata = self.upload_object(cname)[:-1]
222
        url = join_urls(self.pithos_path, self.user, cname, oname)
223

    
224
        l = ['0-499', '-500', '1000-']
225
        ranges = 'bytes=%s' % ','.join(l)
226
        r = self.get(url, HTTP_RANGE=ranges)
227
        self.assertEqual(r.status_code, 206)
228
        self.assertTrue('content-type' in r)
229
        p = re.compile(
230
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
231
            re.I)
232
        m = p.match(r['content-type'])
233
        if m is None:
234
            self.fail('Invalid multiple range content type')
235
        boundary = m.groupdict()['boundary']
236
        cparts = r.content.split('--%s' % boundary)[1:-1]
237

    
238
        # assert content parts length
239
        self.assertEqual(len(cparts), len(l))
240

    
241
        # for each content part assert headers
242
        i = 0
243
        for cpart in cparts:
244
            content = cpart.split('\r\n')
245
            headers = content[1:3]
246
            content_range = headers[0].split(': ')
247
            self.assertEqual(content_range[0], 'Content-Range')
248

    
249
            r = l[i].split('-')
250
            if not r[0] and not r[1]:
251
                pass
252
            elif not r[0]:
253
                start = len(odata) - int(r[1])
254
                end = len(odata)
255
            elif not r[1]:
256
                start = int(r[0])
257
                end = len(odata)
258
            else:
259
                start = int(r[0])
260
                end = int(r[1]) + 1
261
            fdata = odata[start:end]
262
            sdata = '\r\n'.join(content[4:-1])
263
            self.assertEqual(len(fdata), len(sdata))
264
            self.assertEquals(fdata, sdata)
265
            i += 1
266

    
267
    def test_multiple_range_not_satisfiable(self):
268
        # perform get with multiple range
269
        cname = self.containers[0]
270
        oname, odata = self.upload_object(cname)[:-1]
271
        out_of_range = len(odata) + 1
272
        l = ['0-499', '-500', '%d-' % out_of_range]
273
        ranges = 'bytes=%s' % ','.join(l)
274
        url = join_urls(self.pithos_path, self.user, cname, oname)
275
        r = self.get(url, HTTP_RANGE=ranges)
276
        self.assertEqual(r.status_code, 416)
277

    
278
    def test_get_if_match(self):
279
        cname = self.containers[0]
280
        oname, odata = self.upload_object(cname)[:-1]
281

    
282
        # perform get with If-Match
283
        url = join_urls(self.pithos_path, self.user, cname, oname)
284

    
285
        if pithos_settings.UPDATE_MD5:
286
            etag = md5_hash(odata)
287
        else:
288
            etag = merkle(odata)
289

    
290
        r = self.get(url, HTTP_IF_MATCH=etag)
291

    
292
        # assert get success
293
        self.assertEqual(r.status_code, 200)
294

    
295
        # assert response content
296
        self.assertEqual(r.content, odata)
297

    
298
    def test_get_if_match_star(self):
299
        cname = self.containers[0]
300
        oname, odata = self.upload_object(cname)[:-1]
301

    
302
        # perform get with If-Match *
303
        url = join_urls(self.pithos_path, self.user, cname, oname)
304
        r = self.get(url, HTTP_IF_MATCH='*')
305

    
306
        # assert get success
307
        self.assertEqual(r.status_code, 200)
308

    
309
        # assert response content
310
        self.assertEqual(r.content, odata)
311

    
312
    def test_get_multiple_if_match(self):
313
        cname = self.containers[0]
314
        oname, odata = self.upload_object(cname)[:-1]
315

    
316
        # perform get with If-Match
317
        url = join_urls(self.pithos_path, self.user, cname, oname)
318

    
319
        if pithos_settings.UPDATE_MD5:
320
            etag = md5_hash(odata)
321
        else:
322
            etag = merkle(odata)
323

    
324
        quoted = lambda s: '"%s"' % s
325
        r = self.get(url, HTTP_IF_MATCH=','.join(
326
            [quoted(etag), quoted(get_random_data(64))]))
327

    
328
        # assert get success
329
        self.assertEqual(r.status_code, 200)
330

    
331
        # assert response content
332
        self.assertEqual(r.content, odata)
333

    
334
    def test_if_match_precondition_failed(self):
335
        cname = self.containers[0]
336
        oname, odata = self.upload_object(cname)[:-1]
337

    
338
        # perform get with If-Match
339
        url = join_urls(self.pithos_path, self.user, cname, oname)
340
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
341
        self.assertEqual(r.status_code, 412)
342

    
343
    def test_if_none_match(self):
344
        # upload object
345
        cname = self.containers[0]
346
        oname, odata = self.upload_object(cname)[:-1]
347

    
348
        if pithos_settings.UPDATE_MD5:
349
            etag = md5_hash(odata)
350
        else:
351
            etag = merkle(odata)
352

    
353
        # perform get with If-None-Match
354
        url = join_urls(self.pithos_path, self.user, cname, oname)
355
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
356

    
357
        # assert precondition_failed
358
        self.assertEqual(r.status_code, 304)
359

    
360
        # update object data
361
        r = self.append_object_data(cname, oname)[-1]
362
        self.assertTrue(etag != r['ETag'])
363

    
364
        # perform get with If-None-Match
365
        url = join_urls(self.pithos_path, self.user, cname, oname)
366
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
367

    
368
        # assert get success
369
        self.assertEqual(r.status_code, 200)
370

    
371
    def test_if_none_match_star(self):
372
        # upload object
373
        cname = self.containers[0]
374
        oname, odata = self.upload_object(cname)[:-1]
375

    
376
        # perform get with If-None-Match with star
377
        url = join_urls(self.pithos_path, self.user, cname, oname)
378
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
379

    
380
        self.assertEqual(r.status_code, 304)
381

    
382
    def test_if_modified_since(self):
383
        # upload object
384
        cname = self.containers[0]
385
        oname, odata = self.upload_object(cname)[:-1]
386
        object_info = self.get_object_info(cname, oname)
387
        last_modified = object_info['Last-Modified']
388
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
389
        t1_formats = map(t1.strftime, DATE_FORMATS)
390

    
391
        # Check not modified since
392
        url = join_urls(self.pithos_path, self.user, cname, oname)
393
        for t in t1_formats:
394
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
395
            self.assertEqual(r.status_code, 304)
396

    
397
        _time.sleep(1)
398

    
399
        # update object data
400
        appended_data = self.append_object_data(cname, oname)[1]
401

    
402
        # Check modified since
403
        url = join_urls(self.pithos_path, self.user, cname, oname)
404
        for t in t1_formats:
405
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
406
            self.assertEqual(r.status_code, 200)
407
            self.assertEqual(r.content, odata + appended_data)
408

    
409
    def test_if_modified_since_invalid_date(self):
410
        cname = self.containers[0]
411
        oname, odata = self.upload_object(cname)[:-1]
412
        url = join_urls(self.pithos_path, self.user, cname, oname)
413
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
414
        self.assertEqual(r.status_code, 200)
415
        self.assertEqual(r.content, odata)
416

    
417
    def test_if_not_modified_since(self):
418
        cname = self.containers[0]
419
        oname, odata = self.upload_object(cname)[:-1]
420
        url = join_urls(self.pithos_path, self.user, cname, oname)
421
        object_info = self.get_object_info(cname, oname)
422
        last_modified = object_info['Last-Modified']
423
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
424

    
425
        # Check unmodified
426
        t1 = t + datetime.timedelta(seconds=1)
427
        t1_formats = map(t1.strftime, DATE_FORMATS)
428
        for t in t1_formats:
429
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
430
            self.assertEqual(r.status_code, 200)
431
            self.assertEqual(r.content, odata)
432

    
433
        # modify object
434
        _time.sleep(2)
435
        self.append_object_data(cname, oname)
436

    
437
        object_info = self.get_object_info(cname, oname)
438
        last_modified = object_info['Last-Modified']
439
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
440
        t2 = t - datetime.timedelta(seconds=1)
441
        t2_formats = map(t2.strftime, DATE_FORMATS)
442

    
443
        # check modified
444
        for t in t2_formats:
445
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
446
            self.assertEqual(r.status_code, 412)
447

    
448
        # modify account: update object meta
449
        _time.sleep(1)
450
        self.update_object_meta(cname, oname, {'foo': 'bar'})
451

    
452
        object_info = self.get_object_info(cname, oname)
453
        last_modified = object_info['Last-Modified']
454
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
455
        t3 = t - datetime.timedelta(seconds=1)
456
        t3_formats = map(t3.strftime, DATE_FORMATS)
457

    
458
        # check modified
459
        for t in t3_formats:
460
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
461
            self.assertEqual(r.status_code, 412)
462

    
463
    def test_if_unmodified_since(self):
464
        cname = self.containers[0]
465
        oname, odata = self.upload_object(cname)[:-1]
466
        url = join_urls(self.pithos_path, self.user, cname, oname)
467
        object_info = self.get_object_info(cname, oname)
468
        last_modified = object_info['Last-Modified']
469
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
470
        t = t + datetime.timedelta(seconds=1)
471
        t_formats = map(t.strftime, DATE_FORMATS)
472

    
473
        for tf in t_formats:
474
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
475
            self.assertEqual(r.status_code, 200)
476
            self.assertEqual(r.content, odata)
477

    
478
    def test_if_unmodified_since_precondition_failed(self):
479
        cname = self.containers[0]
480
        oname, odata = self.upload_object(cname)[:-1]
481
        url = join_urls(self.pithos_path, self.user, cname, oname)
482
        object_info = self.get_object_info(cname, oname)
483
        last_modified = object_info['Last-Modified']
484
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
485
        t = t - datetime.timedelta(seconds=1)
486
        t_formats = map(t.strftime, DATE_FORMATS)
487

    
488
        for tf in t_formats:
489
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
490
            self.assertEqual(r.status_code, 412)
491

    
492
    def test_hashes(self):
493
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
494
        cname = self.containers[0]
495
        oname, odata = self.upload_object(cname, length=l)[:-1]
496
        size = len(odata)
497

    
498
        url = join_urls(self.pithos_path, self.user, cname, oname)
499
        r = self.get('%s?format=json&hashmap' % url)
500
        self.assertEqual(r.status_code, 200)
501
        body = json.loads(r.content)
502

    
503
        hashes = body['hashes']
504
        block_size = body['block_size']
505
        block_num = size / block_size if size / block_size == 0 else\
506
            size / block_size + 1
507
        self.assertTrue(len(hashes), block_num)
508
        i = 0
509
        for h in hashes:
510
            start = i * block_size
511
            end = (i + 1) * block_size
512
            hash = merkle(odata[start:end])
513
            self.assertEqual(h, hash)
514
            i += 1
515

    
516

    
517
class ObjectPut(PithosAPITest):
518
    def setUp(self):
519
        PithosAPITest.setUp(self)
520
        self.container = get_random_name()
521
        self.create_container(self.container)
522

    
523
    def test_upload(self):
524
        cname = self.container
525
        oname = get_random_name()
526
        data = get_random_data()
527
        meta = {'test': 'test1'}
528
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
529
                       for k, v in meta.iteritems())
530
        url = join_urls(self.pithos_path, self.user, cname, oname)
531
        r = self.put(url, data=data, content_type='application/pdf', **headers)
532
        self.assertEqual(r.status_code, 201)
533
        self.assertTrue('ETag' in r)
534
        self.assertTrue('X-Object-Version' in r)
535

    
536
        info = self.get_object_info(cname, oname)
537

    
538
        # assert object meta
539
        self.assertTrue('X-Object-Meta-Test' in info)
540
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
541

    
542
        # assert content-type
543
        self.assertEqual(info['content-type'], 'application/pdf')
544

    
545
        # assert uploaded content
546
        r = self.get(url)
547
        self.assertEqual(r.status_code, 200)
548
        self.assertEqual(r.content, data)
549

    
550
    def test_maximum_upload_size_exceeds(self):
551
        cname = self.container
552
        oname = get_random_name()
553

    
554
        # set container quota to 100
555
        url = join_urls(self.pithos_path, self.user, cname)
556
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
557
        self.assertEqual(r.status_code, 202)
558

    
559
        info = self.get_container_info(cname)
560
        length = int(info['X-Container-Policy-Quota']) + 1
561

    
562
        data = get_random_data(length)
563
        url = join_urls(self.pithos_path, self.user, cname, oname)
564
        r = self.put(url, data=data)
565
        self.assertEqual(r.status_code, 413)
566

    
567
    def test_upload_with_name_containing_slash(self):
568
        cname = self.container
569
        oname = '/%s' % get_random_name()
570
        data = get_random_data()
571
        url = join_urls(self.pithos_path, self.user, cname, oname)
572
        r = self.put(url, data=data)
573
        self.assertEqual(r.status_code, 201)
574
        self.assertTrue('ETag' in r)
575
        self.assertTrue('X-Object-Version' in r)
576

    
577
        r = self.get(url)
578
        self.assertEqual(r.status_code, 200)
579
        self.assertEqual(r.content, data)
580

    
581
    def test_upload_unprocessable_entity(self):
582
        cname = self.container
583
        oname = get_random_name()
584
        data = get_random_data()
585
        url = join_urls(self.pithos_path, self.user, cname, oname)
586
        r = self.put(url, data=data, HTTP_ETAG='123')
587
        self.assertEqual(r.status_code, 422)
588

    
589
#    def test_chunked_transfer(self):
590
#        cname = self.container
591
#        oname = '/%s' % get_random_name()
592
#        data = get_random_data()
593
#        url = join_urls(self.pithos_path, self.user, cname, oname)
594
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
595
#        self.assertEqual(r.status_code, 201)
596
#        self.assertTrue('ETag' in r)
597
#        self.assertTrue('X-Object-Version' in r)
598

    
599
    def test_manifestation(self):
600
        cname = self.container
601
        prefix = 'myobject/'
602
        data = ''
603
        for i in range(random.randint(2, 10)):
604
            part = '%s%d' % (prefix, i)
605
            data += self.upload_object(cname, oname=part)[1]
606

    
607
        manifest = '%s/%s' % (cname, prefix)
608
        oname = get_random_name()
609
        url = join_urls(self.pithos_path, self.user, cname, oname)
610
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
611
        self.assertEqual(r.status_code, 201)
612

    
613
        # assert object exists
614
        r = self.get(url)
615
        self.assertEqual(r.status_code, 200)
616

    
617
        # assert its content
618
        self.assertEqual(r.content, data)
619

    
620
        # invalid manifestation
621
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
622
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
623
        r = self.get(url)
624
        self.assertEqual(r.content, '')
625

    
626
    def test_create_zero_length_object(self):
627
        cname = self.container
628
        oname = get_random_name()
629
        url = join_urls(self.pithos_path, self.user, cname, oname)
630
        r = self.put(url, data='')
631
        self.assertEqual(r.status_code, 201)
632

    
633
        r = self.get(url)
634
        self.assertEqual(r.status_code, 200)
635
        self.assertEqual(int(r['Content-Length']), 0)
636
        self.assertEqual(r.content, '')
637

    
638
        r = self.get('%s?hashmap=&format=json' % url)
639
        self.assertEqual(r.status_code, 200)
640
        body = json.loads(r.content)
641
        hashes = body['hashes']
642
        hash = merkle('')
643
        self.assertEqual(hashes, [hash])
644

    
645
    def test_create_object_by_hashmap(self):
646
        cname = self.container
647
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
648

    
649
        # upload an object
650
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
651
        # get it hashmap
652
        url = join_urls(self.pithos_path, self.user, cname, oname)
653
        r = self.get('%s?hashmap=&format=json' % url)
654

    
655
        oname = get_random_name()
656
        url = join_urls(self.pithos_path, self.user, cname, oname)
657
        r = self.put('%s?hashmap=' % url, data=r.content)
658
        self.assertEqual(r.status_code, 201)
659

    
660
        r = self.get(url)
661
        self.assertEqual(r.status_code, 200)
662
        self.assertEqual(r.content, data)
663

    
664

    
665
class ObjectCopy(PithosAPITest):
666
    def setUp(self):
667
        PithosAPITest.setUp(self)
668
        self.container = 'c1'
669
        self.create_container(self.container)
670
        self.object, self.data = self.upload_object(self.container)[:-1]
671

    
672
        url = join_urls(
673
            self.pithos_path, self.user, self.container, self.object)
674
        r = self.head(url)
675
        self.etag = r['X-Object-Hash']
676

    
677
    def test_copy(self):
678
        with AssertMappingInvariant(self.get_object_info, self.container,
679
                                    self.object):
680
            # copy object
681
            oname = get_random_name()
682
            url = join_urls(self.pithos_path, self.user, self.container, oname)
683
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
684
                         HTTP_X_COPY_FROM='/%s/%s' % (
685
                             self.container, self.object))
686

    
687
            # assert copy success
688
            self.assertEqual(r.status_code, 201)
689

    
690
            # assert access the new object
691
            r = self.head(url)
692
            self.assertEqual(r.status_code, 200)
693
            self.assertTrue('X-Object-Meta-Test' in r)
694
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
695

    
696
            # assert etag is the same
697
            self.assertTrue('X-Object-Hash' in r)
698
            self.assertEqual(r['X-Object-Hash'], self.etag)
699

    
700
    def test_copy_from_different_container(self):
701
        cname = 'c2'
702
        self.create_container(cname)
703
        with AssertMappingInvariant(self.get_object_info, self.container,
704
                                    self.object):
705
            oname = get_random_name()
706
            url = join_urls(self.pithos_path, self.user, cname, oname)
707
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
708
                         HTTP_X_COPY_FROM='/%s/%s' % (
709
                             self.container, self.object))
710

    
711
            # assert copy success
712
            self.assertEqual(r.status_code, 201)
713

    
714
            # assert access the new object
715
            r = self.head(url)
716
            self.assertEqual(r.status_code, 200)
717
            self.assertTrue('X-Object-Meta-Test' in r)
718
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
719

    
720
            # assert etag is the same
721
            self.assertTrue('X-Object-Hash' in r)
722
            self.assertEqual(r['X-Object-Hash'], self.etag)
723

    
724
    def test_copy_invalid(self):
725
        # copy from non-existent object
726
        oname = get_random_name()
727
        url = join_urls(self.pithos_path, self.user, self.container, oname)
728
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
729
                     HTTP_X_COPY_FROM='/%s/%s' % (
730
                         self.container, get_random_name()))
731
        self.assertEqual(r.status_code, 404)
732

    
733
        # copy from non-existent container
734
        oname = get_random_name()
735
        url = join_urls(self.pithos_path, self.user, self.container, oname)
736
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
737
                     HTTP_X_COPY_FROM='/%s/%s' % (
738
                         get_random_name(), self.object))
739
        self.assertEqual(r.status_code, 404)
740

    
741
    def test_copy_dir(self):
742
        folder = self.create_folder(self.container)[0]
743
        subfolder = self.create_folder(
744
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
745
        objects = [subfolder]
746
        append = objects.append
747
        append(self.upload_object(self.container,
748
                                  '%s/%s' % (folder, get_random_name()),
749
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
750
        append(self.upload_object(self.container,
751
                                  '%s/%s' % (subfolder, get_random_name()),
752
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
753
        other = self.upload_object(self.container, strnextling(folder))[0]
754

    
755
        # copy dir
756
        copy_folder = self.create_folder(self.container)[0]
757
        url = join_urls(self.pithos_path, self.user, self.container,
758
                        copy_folder)
759
        r = self.put('%s?delimiter=/' % url, data='',
760
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
761
        self.assertEqual(r.status_code, 201)
762

    
763
        for obj in objects:
764
            # assert object exists
765
            url = join_urls(self.pithos_path, self.user, self.container,
766
                            obj.replace(folder, copy_folder))
767
            r = self.head(url)
768
            self.assertEqual(r.status_code, 200)
769

    
770
            # assert metadata
771
            meta = self.get_object_meta(self.container, obj)
772
            for k in meta.keys():
773
                self.assertTrue(k in r)
774
                self.assertEqual(r[k], meta[k])
775

    
776
        # assert other has not been created under copy folder
777
        url = join_urls(self.pithos_path, self.user, self.container,
778
                        '%s/%s' % (copy_folder,
779
                                   other.replace(folder, copy_folder)))
780
        r = self.head(url)
781
        self.assertEqual(r.status_code, 404)
782

    
783

    
784
class ObjectMove(PithosAPITest):
785
    def setUp(self):
786
        PithosAPITest.setUp(self)
787
        self.container = 'c1'
788
        self.create_container(self.container)
789
        self.object, self.data = self.upload_object(self.container)[:-1]
790

    
791
        url = join_urls(
792
            self.pithos_path, self.user, self.container, self.object)
793
        r = self.head(url)
794
        self.etag = r['X-Object-Hash']
795

    
796
    def test_move(self):
797
        # move object
798
        oname = get_random_name()
799
        url = join_urls(self.pithos_path, self.user, self.container, oname)
800
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
801
                     HTTP_X_MOVE_FROM='/%s/%s' % (
802
                         self.container, self.object))
803

    
804
        # assert move success
805
        self.assertEqual(r.status_code, 201)
806

    
807
        # assert access the new object
808
        r = self.head(url)
809
        self.assertEqual(r.status_code, 200)
810
        self.assertTrue('X-Object-Meta-Test' in r)
811
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
812

    
813
        # assert etag is the same
814
        self.assertTrue('X-Object-Hash' in r)
815

    
816
        # assert the initial object has been deleted
817
        url = join_urls(self.pithos_path, self.user, self.container,
818
                        self.object)
819
        r = self.head(url)
820
        self.assertEqual(r.status_code, 404)
821

    
822
    def test_move_dir(self):
823
        folder = self.create_folder(self.container)[0]
824
        subfolder = self.create_folder(
825
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
826
        objects = [subfolder]
827
        append = objects.append
828
        meta = {}
829
        meta[objects[0]] = {}
830
        append(self.upload_object(self.container,
831
                                  '%s/%s' % (folder, get_random_name()),
832
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
833
        meta[objects[1]] = {'X-Object-Meta-Depth': '1'}
834
        append(self.upload_object(self.container,
835
                                  '%s/%s' % (subfolder, get_random_name()),
836
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
837
        meta[objects[1]] = {'X-Object-Meta-Depth': '2'}
838
        other = self.upload_object(self.container, strnextling(folder))[0]
839

    
840
        # move dir
841
        copy_folder = self.create_folder(self.container)[0]
842
        url = join_urls(self.pithos_path, self.user, self.container,
843
                        copy_folder)
844
        r = self.put('%s?delimiter=/' % url, data='',
845
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
846
        self.assertEqual(r.status_code, 201)
847

    
848
        for obj in objects:
849
            # assert initial object does not exist
850
            url = join_urls(self.pithos_path, self.user, self.container, obj)
851
            r = self.head(url)
852
            self.assertEqual(r.status_code, 404)
853

    
854
            # assert new object was created
855
            url = join_urls(self.pithos_path, self.user, self.container,
856
                            obj.replace(folder, copy_folder))
857
            r = self.head(url)
858
            self.assertEqual(r.status_code, 200)
859

    
860
#            # assert metadata
861
#            for k in meta[obj].keys():
862
#                self.assertTrue(k in r)
863
#                self.assertEqual(r[k], meta[obj][k])
864

    
865
        # assert other has not been created under copy folder
866
        url = join_urls(self.pithos_path, self.user, self.container,
867
                        '%s/%s' % (copy_folder,
868
                                   other.replace(folder, copy_folder)))
869
        r = self.head(url)
870
        self.assertEqual(r.status_code, 404)
871

    
872

    
873
class ObjectPost(PithosAPITest):
874
    def setUp(self):
875
        PithosAPITest.setUp(self)
876
        self.container = 'c1'
877
        self.create_container(self.container)
878
        self.object, self.object_data = self.upload_object(self.container)[:2]
879

    
880
    def test_update_meta(self):
881
        with AssertUUidInvariant(self.get_object_info,
882
                                 self.container,
883
                                 self.object):
884
            # update metadata
885
            d = {'a' * 114: 'b' * 256}
886
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
887
                          k, v in d.items())
888
            url = join_urls(self.pithos_path, self.user, self.container,
889
                            self.object)
890
            r = self.post(url, content_type='', **kwargs)
891
            self.assertEqual(r.status_code, 202)
892

    
893
            # assert metadata have been updated
894
            meta = self.get_object_meta(self.container, self.object)
895

    
896
            for k, v in d.items():
897
                key = 'X-Object-Meta-%s' % k.title()
898
                self.assertTrue(key in meta)
899
                self.assertTrue(meta[key], v)
900

    
901
            # Header key too large
902
            d = {'a' * 115: 'b' * 256}
903
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
904
                          k, v in d.items())
905
            r = self.post(url, content_type='', **kwargs)
906
            self.assertEqual(r.status_code, 400)
907

    
908
            # Header value too large
909
            d = {'a' * 114: 'b' * 257}
910
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
911
                          k, v in d.items())
912
            r = self.post(url, content_type='', **kwargs)
913
            self.assertEqual(r.status_code, 400)
914

    
915
#            # Check utf-8 meta
916
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
917
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
918
#                          k, v in d.items())
919
#            url = join_urls(self.pithos_path, self.user, self.container,
920
#                            self.object)
921
#            r = self.post(url, content_type='', **kwargs)
922
#            self.assertEqual(r.status_code, 202)
923
#
924
#            # assert metadata have been updated
925
#            meta = self.get_object_meta(self.container, self.object)
926
#
927
#            for k, v in d.items():
928
#                key = 'X-Object-Meta-%s' % k.title()
929
#                self.assertTrue(key in meta)
930
#                self.assertTrue(meta[key], v)
931
#
932
#            # Header key too large
933
#            d = {'α' * 114: 'β' * (256 / 2)}
934
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
935
#                          k, v in d.items())
936
#            r = self.post(url, content_type='', **kwargs)
937
#            self.assertEqual(r.status_code, 400)
938
#
939
#            # Header value too large
940
#            d = {'α' * 114: 'β' * 256}
941
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
942
#                          k, v in d.items())
943
#            r = self.udpate(url, content_type='', **kwargs)
944
#            self.assertEqual(r.status_code, 400)
945

    
946
    def test_update_object(self):
947
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
948
        oname, odata = self.upload_object(
949
            self.container, length=random.randint(
950
                block_size + 1, 2 * block_size))[:2]
951

    
952
        length = len(odata)
953
        first_byte_pos = random.randint(1, block_size)
954
        last_byte_pos = random.randint(block_size + 1, length - 1)
955
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
956
        kwargs = {'content_type': 'application/octet-stream',
957
                  'HTTP_CONTENT_RANGE': range}
958

    
959
        url = join_urls(self.pithos_path, self.user, self.container, oname)
960
        partial = last_byte_pos - first_byte_pos + 1
961
        data = get_random_data(partial)
962
        r = self.post(url, data=data, **kwargs)
963

    
964
        self.assertEqual(r.status_code, 204)
965
        self.assertTrue('ETag' in r)
966
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
967
                                     data)
968
        if pithos_settings.UPDATE_MD5:
969
            etag = md5_hash(updated_data)
970
        else:
971
            etag = merkle(updated_data)
972
        #self.assertEqual(r['ETag'], etag)
973

    
974
        # check modified object
975
        r = self.get(url)
976

    
977
        self.assertEqual(r.status_code, 200)
978
        self.assertEqual(r.content, updated_data)
979
        self.assertEqual(etag, r['ETag'])
980

    
981
    def test_update_object_divided_by_blocksize(self):
982
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
983
        oname, odata = self.upload_object(self.container,
984
                                          length=2 * block_size)[:2]
985

    
986
        length = len(odata)
987
        first_byte_pos = block_size
988
        last_byte_pos = 2 * block_size - 1
989
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
990
        kwargs = {'content_type': 'application/octet-stream',
991
                  'HTTP_CONTENT_RANGE': range}
992

    
993
        url = join_urls(self.pithos_path, self.user, self.container, oname)
994
        partial = last_byte_pos - first_byte_pos + 1
995
        data = get_random_data(partial)
996
        r = self.post(url, data=data, **kwargs)
997

    
998
        self.assertEqual(r.status_code, 204)
999
        self.assertTrue('ETag' in r)
1000
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1001
                                     data)
1002
        if pithos_settings.UPDATE_MD5:
1003
            etag = md5_hash(updated_data)
1004
        else:
1005
            etag = merkle(updated_data)
1006
        #self.assertEqual(r['ETag'], etag)
1007

    
1008
        # check modified object
1009
        r = self.get(url)
1010

    
1011
        self.assertEqual(r.status_code, 200)
1012
        self.assertEqual(r.content, updated_data)
1013
        self.assertEqual(etag, r['ETag'])
1014

    
1015
    def test_update_object_invalid_content_length(self):
1016
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1017
        oname, odata = self.upload_object(
1018
            self.container, length=random.randint(
1019
                block_size + 1, 2 * block_size))[:2]
1020

    
1021
        length = len(odata)
1022
        first_byte_pos = random.randint(1, block_size)
1023
        last_byte_pos = random.randint(block_size + 1, length - 1)
1024
        partial = last_byte_pos - first_byte_pos + 1
1025
        data = get_random_data(partial)
1026
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1027
        kwargs = {'content_type': 'application/octet-stream',
1028
                  'HTTP_CONTENT_RANGE': range,
1029
                  'CONTENT_LENGTH': partial + 1}
1030

    
1031
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1032
        r = self.post(url, data=data, **kwargs)
1033

    
1034
        self.assertEqual(r.status_code, 400)
1035

    
1036
    def test_update_object_invalid_range(self):
1037
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1038
        oname, odata = self.upload_object(
1039
            self.container, length=random.randint(block_size + 1,
1040
                                                  2 * block_size))[:2]
1041

    
1042
        length = len(odata)
1043
        first_byte_pos = random.randint(1, block_size)
1044
        last_byte_pos = first_byte_pos - 1
1045
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1046
        kwargs = {'content_type': 'application/octet-stream',
1047
                  'HTTP_CONTENT_RANGE': range}
1048

    
1049
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1050
        r = self.post(url, data=get_random_data(), **kwargs)
1051

    
1052
        self.assertEqual(r.status_code, 416)
1053

    
1054
    def test_update_object_out_of_limits(self):
1055
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1056
        oname, odata = self.upload_object(
1057
            self.container, length=random.randint(block_size + 1,
1058
                                                  2 * block_size))[:2]
1059

    
1060
        length = len(odata)
1061
        first_byte_pos = random.randint(1, block_size)
1062
        last_byte_pos = length + 1
1063
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1064
        kwargs = {'content_type': 'application/octet-stream',
1065
                  'HTTP_CONTENT_RANGE': range}
1066

    
1067
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1068
        r = self.post(url, data=get_random_data(), **kwargs)
1069

    
1070
        self.assertEqual(r.status_code, 416)
1071

    
1072
    def test_append(self):
1073
        data = get_random_data()
1074
        length = len(data)
1075
        url = join_urls(self.pithos_path, self.user, self.container,
1076
                        self.object)
1077
        r = self.post(url, data=data, content_type='application/octet-stream',
1078
                      HTTP_CONTENT_LENGTH=str(length),
1079
                      HTTP_CONTENT_RANGE='bytes */*')
1080
        self.assertEqual(r.status_code, 204)
1081

    
1082
        r = self.get(url)
1083
        content = r.content
1084
        self.assertEqual(len(content), len(self.object_data) + length)
1085
        self.assertEqual(content, self.object_data + data)
1086

    
1087
    # TODO Fix the test
1088
    def _test_update_with_chunked_transfer(self):
1089
        data = get_random_data()
1090
        length = len(data)
1091

    
1092
        url = join_urls(self.pithos_path, self.user, self.container,
1093
                        self.object)
1094
        r = self.post(url, data=data, content_type='application/octet-stream',
1095
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1096
                      HTTP_TRANSFER_ENCODING='chunked')
1097
        self.assertEqual(r.status_code, 204)
1098

    
1099
        # check modified object
1100
        r = self.get(url)
1101
        content = r.content
1102
        self.assertEqual(content[0:length], data)
1103
        self.assertEqual(content[length:], self.object_data[length:])
1104

    
1105
    def test_update_from_other_object(self):
1106
        src = self.object
1107
        dest = get_random_name()
1108

    
1109
        url = join_urls(self.pithos_path, self.user, self.container, src)
1110
        r = self.get(url)
1111
        source_data = r.content
1112
        source_meta = self.get_object_info(self.container, src)
1113

    
1114
        # update zero length object
1115
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1116
        r = self.put(url, data='')
1117
        self.assertEqual(r.status_code, 201)
1118

    
1119
        r = self.post(url,
1120
                      HTTP_CONTENT_RANGE='bytes */*',
1121
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1122
        self.assertEqual(r.status_code, 204)
1123

    
1124
        r = self.get(url)
1125
        dest_data = r.content
1126
        dest_meta = self.get_object_info(self.container, dest)
1127

    
1128
        self.assertEqual(source_data, dest_data)
1129
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1130
        self.assertEqual(source_meta['X-Object-Hash'],
1131
                         dest_meta['X-Object-Hash'])
1132
        self.assertTrue(
1133
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1134

    
1135
    def test_update_range_from_other_object(self):
1136
        src = self.object
1137
        dest = get_random_name()
1138

    
1139
        url = join_urls(self.pithos_path, self.user, self.container, src)
1140
        r = self.get(url)
1141
        source_data = r.content
1142

    
1143
        # update zero length object
1144
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1145
        initial_data = get_random_data()
1146
        length = len(initial_data)
1147
        r = self.put(url, data=initial_data)
1148
        self.assertEqual(r.status_code, 201)
1149

    
1150
        offset = random.randint(1, length - 2)
1151
        upto = random.randint(offset, length - 1)
1152
        r = self.post(url,
1153
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1154
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1155
        self.assertEqual(r.status_code, 204)
1156

    
1157
        r = self.get(url)
1158
        content = r.content
1159
        self.assertEqual(content, (initial_data[:offset] +
1160
                                   source_data[:upto - offset + 1] +
1161
                                   initial_data[upto + 1:]))
1162

    
1163
    def test_update_range_from_invalid_other_object(self):
1164
        src = self.object
1165
        dest = get_random_name()
1166

    
1167
        url = join_urls(self.pithos_path, self.user, self.container, src)
1168
        r = self.get(url)
1169

    
1170
        # update zero length object
1171
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1172
        initial_data = get_random_data()
1173
        length = len(initial_data)
1174
        r = self.put(url, data=initial_data)
1175
        self.assertEqual(r.status_code, 201)
1176

    
1177
        offset = random.randint(1, length - 2)
1178
        upto = random.randint(offset, length - 1)
1179

    
1180
        # source object does not start with /
1181
        r = self.post(url,
1182
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1183
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1184
        self.assertEqual(r.status_code, 400)
1185

    
1186
        # source object does not exist
1187
        r = self.post(url,
1188
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1189
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1190
        self.assertEqual(r.status_code, 404)
1191

    
1192
    def test_update_from_other_version(self):
1193
        versions = []
1194
        info = self.get_object_info(self.container, self.object)
1195
        versions.append(info['X-Object-Version'])
1196
        pre_length = int(info['Content-Length'])
1197

    
1198
        # update object
1199
        d1, r = self.upload_object(self.container, self.object,
1200
                                   length=pre_length - 1)[1:]
1201
        self.assertTrue('X-Object-Version' in r)
1202
        versions.append(r['X-Object-Version'])
1203

    
1204
        # update object
1205
        d2, r = self.upload_object(self.container, self.object,
1206
                                   length=pre_length - 2)[1:]
1207
        self.assertTrue('X-Object-Version' in r)
1208
        versions.append(r['X-Object-Version'])
1209

    
1210
        # get previous version
1211
        url = join_urls(self.pithos_path, self.user, self.container,
1212
                        self.object)
1213
        r = self.get('%s?version=list&format=json' % url)
1214
        self.assertEqual(r.status_code, 200)
1215
        l = json.loads(r.content)['versions']
1216
        self.assertEqual(len(l), 3)
1217
        self.assertEqual([str(v[0]) for v in l], versions)
1218

    
1219
        # update with the previous version
1220
        r = self.post(url,
1221
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1222
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1223
                                                       self.object),
1224
                      HTTP_X_SOURCE_VERSION=versions[0])
1225
        self.assertEqual(r.status_code, 204)
1226

    
1227
        # check content
1228
        r = self.get(url)
1229
        content = r.content
1230
        self.assertEqual(len(content), pre_length)
1231
        self.assertEqual(content, self.object_data)
1232

    
1233
        # update object
1234
        d3, r = self.upload_object(self.container, self.object,
1235
                                   length=len(d2) + 1)[1:]
1236
        self.assertTrue('X-Object-Version' in r)
1237
        versions.append(r['X-Object-Version'])
1238

    
1239
        # update with the previous version
1240
        r = self.post(url,
1241
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1242
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1243
                                                       self.object),
1244
                      HTTP_X_SOURCE_VERSION=versions[-2])
1245
        self.assertEqual(r.status_code, 204)
1246

    
1247
        # check content
1248
        r = self.get(url)
1249
        content = r.content
1250
        self.assertEqual(content, d2 + d3[-1])
1251

    
1252

    
1253
class ObjectDelete(PithosAPITest):
1254
    def setUp(self):
1255
        PithosAPITest.setUp(self)
1256
        self.container = 'c1'
1257
        self.create_container(self.container)
1258
        self.object, self.object_data = self.upload_object(self.container)[:2]
1259

    
1260
    def test_delete(self):
1261
        url = join_urls(self.pithos_path, self.user, self.container,
1262
                        self.object)
1263
        r = self.delete(url)
1264
        self.assertEqual(r.status_code, 204)
1265

    
1266
        r = self.head(url)
1267
        self.assertEqual(r.status_code, 404)
1268

    
1269
    def test_delete_non_existent(self):
1270
        url = join_urls(self.pithos_path, self.user, self.container,
1271
                        get_random_name())
1272
        r = self.delete(url)
1273
        self.assertEqual(r.status_code, 404)
1274

    
1275
    def test_delete_dir(self):
1276
        folder = self.create_folder(self.container)[0]
1277
        subfolder = self.create_folder(
1278
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1279
        objects = [subfolder]
1280
        append = objects.append
1281
        meta = {}
1282
        meta[objects[0]] = {}
1283
        append(self.upload_object(self.container,
1284
                                  '%s/%s' % (folder, get_random_name()),
1285
                                  HTTP_X_OBJECT_META_DEPTH='1')[0])
1286
        meta[objects[1]] = {'X-Object-Meta-Depth': '1'}
1287
        append(self.upload_object(self.container,
1288
                                  '%s/%s' % (subfolder, get_random_name()),
1289
                                  HTTP_X_OBJECT_META_DEPTH='2')[0])
1290
        meta[objects[1]] = {'X-Object-Meta-Depth': '2'}
1291
        other = self.upload_object(self.container, strnextling(folder))[0]
1292

    
1293
        # move dir
1294
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1295
        r = self.delete('%s?delimiter=/' % url)
1296
        self.assertEqual(r.status_code, 204)
1297

    
1298
        for obj in objects:
1299
            # assert object does not exist
1300
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1301
            r = self.head(url)
1302
            self.assertEqual(r.status_code, 404)
1303

    
1304
        # assert other has not been deleted
1305
        url = join_urls(self.pithos_path, self.user, self.container, other)
1306
        r = self.head(url)
1307
        self.assertEqual(r.status_code, 200)