Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / objects.py @ 6ee6677e

History | View | Annotate | Download (50 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from collections import defaultdict
38
from urllib import quote
39
from functools import partial
40

    
41
from pithos.api.test import (PithosAPITest, pithos_settings,
42
                             AssertMappingInvariant, AssertUUidInvariant,
43
                             TEST_BLOCK_SIZE, TEST_HASH_ALGORITHM,
44
                             DATE_FORMATS)
45
from pithos.api.test.util import (md5_hash, merkle, strnextling,
46
                                  get_random_data, get_random_name)
47

    
48
from synnefo.lib import join_urls
49

    
50
import django.utils.simplejson as json
51

    
52
import random
53
import re
54
import datetime
55
import time as _time
56

    
57
merkle = partial(merkle,
58
                 blocksize=TEST_BLOCK_SIZE,
59
                 blockhash=TEST_HASH_ALGORITHM)
60

    
61

    
62
class ObjectHead(PithosAPITest):
63
    def setUp(self):
64
        cname = self.create_container()[0]
65
        oname, odata = self.upload_object(cname)[:-1]
66

    
67
        url = join_urls(self.pithos_path, cname, oname)
68
        r = self.head(url)
69
        map(lambda i: self.assertTrue(i in r),
70
            ['Etag',
71
             'Content-Length',
72
             'Content-Type',
73
             'Last-Modified',
74
             'Content-Encoding',
75
             'Content-Disposition',
76
             'X-Object-Hash',
77
             'X-Object-UUID',
78
             'X-Object-Version',
79
             'X-Object-Version-Timestamp',
80
             'X-Object-Modified-By',
81
             'X-Object-Manifest'])
82

    
83

    
84
class ObjectGet(PithosAPITest):
85
    def setUp(self):
86
        PithosAPITest.setUp(self)
87
        self.containers = ['c1', 'c2']
88

    
89
        # create some containers
90
        for c in self.containers:
91
            self.create_container(c)
92

    
93
        # upload files
94
        self.objects = defaultdict(list)
95
        self.objects['c1'].append(self.upload_object('c1')[0])
96

    
97
    def test_versions(self):
98
        c = 'c1'
99
        o = self.objects[c][0]
100
        url = join_urls(self.pithos_path, self.user, c, o)
101

    
102
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
103
        r = self.post(url, content_type='', **meta)
104
        self.assertEqual(r.status_code, 202)
105

    
106
        url = join_urls(self.pithos_path, self.user, c, o)
107
        r = self.get('%s?version=list&format=json' % url)
108
        self.assertEqual(r.status_code, 200)
109
        l1 = json.loads(r.content)['versions']
110
        self.assertEqual(len(l1), 2)
111

    
112
        # update meta
113
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AB',
114
                'HTTP_X_OBJECT_META_STOCK': 'True'}
115
        r = self.post(url, content_type='', **meta)
116
        self.assertEqual(r.status_code, 202)
117

    
118
        # assert a newly created version has been created
119
        r = self.get('%s?version=list&format=json' % url)
120
        self.assertEqual(r.status_code, 200)
121
        l2 = json.loads(r.content)['versions']
122
        self.assertEqual(len(l2), len(l1) + 1)
123
        self.assertEqual(l2[:-1], l1)
124

    
125
        vserial, _ = l2[-2]
126
        self.assertEqual(self.get_object_meta(c, o, version=vserial),
127
                         {'Quality': 'AAA'})
128

    
129
        # update data
130
        self.append_object_data(c, o)
131

    
132
        # assert a newly created version has been created
133
        r = self.get('%s?version=list&format=json' % url)
134
        self.assertEqual(r.status_code, 200)
135
        l3 = json.loads(r.content)['versions']
136
        self.assertEqual(len(l3), len(l2) + 1)
137
        self.assertEqual(l3[:-1], l2)
138

    
139
    def test_get_version(self):
140
        c = 'c1'
141
        o = self.objects[c][0]
142
        url = join_urls(self.pithos_path, self.user, c, o)
143

    
144
        # Update metadata
145
        meta = {'HTTP_X_OBJECT_META_QUALITY': 'AAA'}
146
        r = self.post(url, content_type='', **meta)
147
        self.assertEqual(r.status_code, 202)
148

    
149
        url = join_urls(self.pithos_path, self.user, c, o)
150
        r = self.get('%s?version=list&format=json' % url)
151
        self.assertEqual(r.status_code, 200)
152
        l = json.loads(r.content)['versions']
153
        self.assertEqual(len(l), 2)
154

    
155
        r = self.head('%s?version=%s' % (url, l[0][0]))
156
        self.assertEqual(r.status_code, 200)
157
        self.assertTrue('X-Object-Meta-Quality' not in r)
158

    
159
        r = self.head('%s?version=%s' % (url, l[1][0]))
160
        self.assertEqual(r.status_code, 200)
161
        self.assertTrue('X-Object-Meta-Quality' in r)
162

    
163
        # test invalid version
164
        r = self.head('%s?version=-1' % url)
165
        self.assertEqual(r.status_code, 404)
166

    
167
        other_name, other_data, r = self.upload_object(c)
168
        self.assertTrue('X-Object-Version' in r)
169
        other_version = r['X-Object-Version']
170

    
171
        self.assertTrue(o != other_name)
172

    
173
        r = self.get('%s?version=%s' % (url, other_version))
174
        self.assertEqual(r.status_code, 404)
175

    
176
        r = self.head('%s?version=%s' % (url, other_version))
177
        self.assertEqual(r.status_code, 404)
178

    
179
    def test_objects_with_trailing_spaces(self):
180
        # create object
181
        oname = self.upload_object('c1')[0]
182
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
183

    
184
        r = self.get(quote('%s ' % url))
185
        self.assertEqual(r.status_code, 404)
186

    
187
        # delete object
188
        self.delete(url)
189

    
190
        r = self.get(url)
191
        self.assertEqual(r.status_code, 404)
192

    
193
        # upload object with trailing space
194
        oname = self.upload_object('c1', quote('%s ' % get_random_name()))[0]
195

    
196
        url = join_urls(self.pithos_path, self.user, 'c1', oname)
197
        r = self.get(url)
198
        self.assertEqual(r.status_code, 200)
199

    
200
        url = join_urls(self.pithos_path, self.user, 'c1', oname[:-1])
201
        r = self.get(url)
202
        self.assertEqual(r.status_code, 404)
203

    
204
    def test_get_partial(self):
205
        cname = self.containers[0]
206
        oname, odata = self.upload_object(cname, length=512)[:-1]
207
        url = join_urls(self.pithos_path, self.user, cname, oname)
208
        r = self.get(url, HTTP_RANGE='bytes=0-499')
209
        self.assertEqual(r.status_code, 206)
210
        data = r.content
211
        self.assertEqual(data, odata[:500])
212
        self.assertTrue('Content-Range' in r)
213
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
214
        self.assertTrue('Content-Type' in r)
215
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
216

    
217
    def test_get_final_500(self):
218
        cname = self.containers[0]
219
        oname, odata = self.upload_object(cname, length=512)[:-1]
220
        size = len(odata)
221
        url = join_urls(self.pithos_path, self.user, cname, oname)
222
        r = self.get(url, HTTP_RANGE='bytes=-500')
223
        self.assertEqual(r.status_code, 206)
224
        self.assertEqual(r.content, odata[-500:])
225
        self.assertTrue('Content-Range' in r)
226
        self.assertEqual(r['Content-Range'],
227
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
228
        self.assertTrue('Content-Type' in r)
229
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
230

    
231
    def test_get_rest(self):
232
        cname = self.containers[0]
233
        oname, odata = self.upload_object(cname, length=512)[:-1]
234
        size = len(odata)
235
        url = join_urls(self.pithos_path, self.user, cname, oname)
236
        offset = len(odata) - random.randint(1, 512)
237
        r = self.get(url, HTTP_RANGE='bytes=%s-' % offset)
238
        self.assertEqual(r.status_code, 206)
239
        self.assertEqual(r.content, odata[offset:])
240
        self.assertTrue('Content-Range' in r)
241
        self.assertEqual(r['Content-Range'],
242
                         'bytes %s-%s/%s' % (offset, size - 1, size))
243
        self.assertTrue('Content-Type' in r)
244
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
245

    
246
    def test_get_range_not_satisfiable(self):
247
        cname = self.containers[0]
248
        oname, odata = self.upload_object(cname, length=512)[:-1]
249
        url = join_urls(self.pithos_path, self.user, cname, oname)
250

    
251
        # TODO
252
        #r = self.get(url, HTTP_RANGE='bytes=50-10')
253
        #self.assertEqual(r.status_code, 416)
254

    
255
        offset = len(odata) + 1
256
        r = self.get(url, HTTP_RANGE='bytes=0-%s' % offset)
257
        self.assertEqual(r.status_code, 416)
258

    
259
    def test_multiple_range(self):
260
        cname = self.containers[0]
261
        oname, odata = self.upload_object(cname)[:-1]
262
        url = join_urls(self.pithos_path, self.user, cname, oname)
263

    
264
        l = ['0-499', '-500', '1000-']
265
        ranges = 'bytes=%s' % ','.join(l)
266
        r = self.get(url, HTTP_RANGE=ranges)
267
        self.assertEqual(r.status_code, 206)
268
        self.assertTrue('content-type' in r)
269
        p = re.compile(
270
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
271
            re.I)
272
        m = p.match(r['content-type'])
273
        if m is None:
274
            self.fail('Invalid multiple range content type')
275
        boundary = m.groupdict()['boundary']
276
        cparts = r.content.split('--%s' % boundary)[1:-1]
277

    
278
        # assert content parts length
279
        self.assertEqual(len(cparts), len(l))
280

    
281
        # for each content part assert headers
282
        i = 0
283
        for cpart in cparts:
284
            content = cpart.split('\r\n')
285
            headers = content[1:3]
286
            content_range = headers[0].split(': ')
287
            self.assertEqual(content_range[0], 'Content-Range')
288

    
289
            r = l[i].split('-')
290
            if not r[0] and not r[1]:
291
                pass
292
            elif not r[0]:
293
                start = len(odata) - int(r[1])
294
                end = len(odata)
295
            elif not r[1]:
296
                start = int(r[0])
297
                end = len(odata)
298
            else:
299
                start = int(r[0])
300
                end = int(r[1]) + 1
301
            fdata = odata[start:end]
302
            sdata = '\r\n'.join(content[4:-1])
303
            self.assertEqual(len(fdata), len(sdata))
304
            self.assertEquals(fdata, sdata)
305
            i += 1
306

    
307
    def test_multiple_range_not_satisfiable(self):
308
        # perform get with multiple range
309
        cname = self.containers[0]
310
        oname, odata = self.upload_object(cname)[:-1]
311
        out_of_range = len(odata) + 1
312
        l = ['0-499', '-500', '%d-' % out_of_range]
313
        ranges = 'bytes=%s' % ','.join(l)
314
        url = join_urls(self.pithos_path, self.user, cname, oname)
315
        r = self.get(url, HTTP_RANGE=ranges)
316
        self.assertEqual(r.status_code, 416)
317

    
318
    def test_get_if_match(self):
319
        cname = self.containers[0]
320
        oname, odata = self.upload_object(cname)[:-1]
321

    
322
        # perform get with If-Match
323
        url = join_urls(self.pithos_path, self.user, cname, oname)
324

    
325
        if pithos_settings.UPDATE_MD5:
326
            etag = md5_hash(odata)
327
        else:
328
            etag = merkle(odata)
329

    
330
        r = self.get(url, HTTP_IF_MATCH=etag)
331

    
332
        # assert get success
333
        self.assertEqual(r.status_code, 200)
334

    
335
        # assert response content
336
        self.assertEqual(r.content, odata)
337

    
338
    def test_get_if_match_star(self):
339
        cname = self.containers[0]
340
        oname, odata = self.upload_object(cname)[:-1]
341

    
342
        # perform get with If-Match *
343
        url = join_urls(self.pithos_path, self.user, cname, oname)
344
        r = self.get(url, HTTP_IF_MATCH='*')
345

    
346
        # assert get success
347
        self.assertEqual(r.status_code, 200)
348

    
349
        # assert response content
350
        self.assertEqual(r.content, odata)
351

    
352
    def test_get_multiple_if_match(self):
353
        cname = self.containers[0]
354
        oname, odata = self.upload_object(cname)[:-1]
355

    
356
        # perform get with If-Match
357
        url = join_urls(self.pithos_path, self.user, cname, oname)
358

    
359
        if pithos_settings.UPDATE_MD5:
360
            etag = md5_hash(odata)
361
        else:
362
            etag = merkle(odata)
363

    
364
        quoted = lambda s: '"%s"' % s
365
        r = self.get(url, HTTP_IF_MATCH=','.join(
366
            [quoted(etag), quoted(get_random_data(64))]))
367

    
368
        # assert get success
369
        self.assertEqual(r.status_code, 200)
370

    
371
        # assert response content
372
        self.assertEqual(r.content, odata)
373

    
374
    def test_if_match_precondition_failed(self):
375
        cname = self.containers[0]
376
        oname, odata = self.upload_object(cname)[:-1]
377

    
378
        # perform get with If-Match
379
        url = join_urls(self.pithos_path, self.user, cname, oname)
380
        r = self.get(url, HTTP_IF_MATCH=get_random_name())
381
        self.assertEqual(r.status_code, 412)
382

    
383
    def test_if_none_match(self):
384
        # upload object
385
        cname = self.containers[0]
386
        oname, odata = self.upload_object(cname)[:-1]
387

    
388
        if pithos_settings.UPDATE_MD5:
389
            etag = md5_hash(odata)
390
        else:
391
            etag = merkle(odata)
392

    
393
        # perform get with If-None-Match
394
        url = join_urls(self.pithos_path, self.user, cname, oname)
395
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
396

    
397
        # assert precondition_failed
398
        self.assertEqual(r.status_code, 304)
399

    
400
        # update object data
401
        r = self.append_object_data(cname, oname)[-1]
402
        self.assertTrue(etag != r['ETag'])
403

    
404
        # perform get with If-None-Match
405
        url = join_urls(self.pithos_path, self.user, cname, oname)
406
        r = self.get(url, HTTP_IF_NONE_MATCH=etag)
407

    
408
        # assert get success
409
        self.assertEqual(r.status_code, 200)
410

    
411
    def test_if_none_match_star(self):
412
        # upload object
413
        cname = self.containers[0]
414
        oname, odata = self.upload_object(cname)[:-1]
415

    
416
        # perform get with If-None-Match with star
417
        url = join_urls(self.pithos_path, self.user, cname, oname)
418
        r = self.get(url, HTTP_IF_NONE_MATCH='*')
419

    
420
        self.assertEqual(r.status_code, 304)
421

    
422
    def test_if_modified_since(self):
423
        # upload object
424
        cname = self.containers[0]
425
        oname, odata = self.upload_object(cname)[:-1]
426
        object_info = self.get_object_info(cname, oname)
427
        last_modified = object_info['Last-Modified']
428
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
429
        t1_formats = map(t1.strftime, DATE_FORMATS)
430

    
431
        # Check not modified since
432
        url = join_urls(self.pithos_path, self.user, cname, oname)
433
        for t in t1_formats:
434
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
435
            self.assertEqual(r.status_code, 304)
436

    
437
        _time.sleep(1)
438

    
439
        # update object data
440
        appended_data = self.append_object_data(cname, oname)[1]
441

    
442
        # Check modified since
443
        url = join_urls(self.pithos_path, self.user, cname, oname)
444
        for t in t1_formats:
445
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
446
            self.assertEqual(r.status_code, 200)
447
            self.assertEqual(r.content, odata + appended_data)
448

    
449
    def test_if_modified_since_invalid_date(self):
450
        cname = self.containers[0]
451
        oname, odata = self.upload_object(cname)[:-1]
452
        url = join_urls(self.pithos_path, self.user, cname, oname)
453
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
454
        self.assertEqual(r.status_code, 200)
455
        self.assertEqual(r.content, odata)
456

    
457
    def test_if_not_modified_since(self):
458
        cname = self.containers[0]
459
        oname, odata = self.upload_object(cname)[:-1]
460
        url = join_urls(self.pithos_path, self.user, cname, oname)
461
        object_info = self.get_object_info(cname, oname)
462
        last_modified = object_info['Last-Modified']
463
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
464

    
465
        # Check unmodified
466
        t1 = t + datetime.timedelta(seconds=1)
467
        t1_formats = map(t1.strftime, DATE_FORMATS)
468
        for t in t1_formats:
469
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
470
            self.assertEqual(r.status_code, 200)
471
            self.assertEqual(r.content, odata)
472

    
473
        # modify object
474
        _time.sleep(2)
475
        self.append_object_data(cname, oname)
476

    
477
        object_info = self.get_object_info(cname, oname)
478
        last_modified = object_info['Last-Modified']
479
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
480
        t2 = t - datetime.timedelta(seconds=1)
481
        t2_formats = map(t2.strftime, DATE_FORMATS)
482

    
483
        # check modified
484
        for t in t2_formats:
485
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
486
            self.assertEqual(r.status_code, 412)
487

    
488
        # modify account: update object meta
489
        _time.sleep(1)
490
        self.update_object_meta(cname, oname, {'foo': 'bar'})
491

    
492
        object_info = self.get_object_info(cname, oname)
493
        last_modified = object_info['Last-Modified']
494
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
495
        t3 = t - datetime.timedelta(seconds=1)
496
        t3_formats = map(t3.strftime, DATE_FORMATS)
497

    
498
        # check modified
499
        for t in t3_formats:
500
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
501
            self.assertEqual(r.status_code, 412)
502

    
503
    def test_if_unmodified_since(self):
504
        cname = self.containers[0]
505
        oname, odata = self.upload_object(cname)[:-1]
506
        url = join_urls(self.pithos_path, self.user, cname, oname)
507
        object_info = self.get_object_info(cname, oname)
508
        last_modified = object_info['Last-Modified']
509
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
510
        t = t + datetime.timedelta(seconds=1)
511
        t_formats = map(t.strftime, DATE_FORMATS)
512

    
513
        for tf in t_formats:
514
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
515
            self.assertEqual(r.status_code, 200)
516
            self.assertEqual(r.content, odata)
517

    
518
    def test_if_unmodified_since_precondition_failed(self):
519
        cname = self.containers[0]
520
        oname, odata = self.upload_object(cname)[:-1]
521
        url = join_urls(self.pithos_path, self.user, cname, oname)
522
        object_info = self.get_object_info(cname, oname)
523
        last_modified = object_info['Last-Modified']
524
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
525
        t = t - datetime.timedelta(seconds=1)
526
        t_formats = map(t.strftime, DATE_FORMATS)
527

    
528
        for tf in t_formats:
529
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
530
            self.assertEqual(r.status_code, 412)
531

    
532
    def test_hashes(self):
533
        l = random.randint(2, 5) * pithos_settings.BACKEND_BLOCK_SIZE
534
        cname = self.containers[0]
535
        oname, odata = self.upload_object(cname, length=l)[:-1]
536
        size = len(odata)
537

    
538
        url = join_urls(self.pithos_path, self.user, cname, oname)
539
        r = self.get('%s?format=json&hashmap' % url)
540
        self.assertEqual(r.status_code, 200)
541
        body = json.loads(r.content)
542

    
543
        hashes = body['hashes']
544
        block_size = body['block_size']
545
        block_num = size / block_size if size / block_size == 0 else\
546
            size / block_size + 1
547
        self.assertTrue(len(hashes), block_num)
548
        i = 0
549
        for h in hashes:
550
            start = i * block_size
551
            end = (i + 1) * block_size
552
            hash = merkle(odata[start:end])
553
            self.assertEqual(h, hash)
554
            i += 1
555

    
556

    
557
class ObjectPut(PithosAPITest):
558
    def setUp(self):
559
        PithosAPITest.setUp(self)
560
        self.container = get_random_name()
561
        self.create_container(self.container)
562

    
563
    def test_upload(self):
564
        cname = self.container
565
        oname = get_random_name()
566
        data = get_random_data()
567
        meta = {'test': 'test1'}
568
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
569
                       for k, v in meta.iteritems())
570
        url = join_urls(self.pithos_path, self.user, cname, oname)
571
        r = self.put(url, data=data, content_type='application/pdf', **headers)
572
        self.assertEqual(r.status_code, 201)
573
        self.assertTrue('ETag' in r)
574
        self.assertTrue('X-Object-Version' in r)
575

    
576
        info = self.get_object_info(cname, oname)
577

    
578
        # assert object meta
579
        self.assertTrue('X-Object-Meta-Test' in info)
580
        self.assertEqual(info['X-Object-Meta-Test'], 'test1')
581

    
582
        # assert content-type
583
        self.assertEqual(info['content-type'], 'application/pdf')
584

    
585
        # assert uploaded content
586
        r = self.get(url)
587
        self.assertEqual(r.status_code, 200)
588
        self.assertEqual(r.content, data)
589

    
590
    def test_maximum_upload_size_exceeds(self):
591
        cname = self.container
592
        oname = get_random_name()
593

    
594
        # set container quota to 100
595
        url = join_urls(self.pithos_path, self.user, cname)
596
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
597
        self.assertEqual(r.status_code, 202)
598

    
599
        info = self.get_container_info(cname)
600
        length = int(info['X-Container-Policy-Quota']) + 1
601

    
602
        data = get_random_data(length)
603
        url = join_urls(self.pithos_path, self.user, cname, oname)
604
        r = self.put(url, data=data)
605
        self.assertEqual(r.status_code, 413)
606

    
607
    def test_upload_with_name_containing_slash(self):
608
        cname = self.container
609
        oname = '/%s' % get_random_name()
610
        data = get_random_data()
611
        url = join_urls(self.pithos_path, self.user, cname, oname)
612
        r = self.put(url, data=data)
613
        self.assertEqual(r.status_code, 201)
614
        self.assertTrue('ETag' in r)
615
        self.assertTrue('X-Object-Version' in r)
616

    
617
        r = self.get(url)
618
        self.assertEqual(r.status_code, 200)
619
        self.assertEqual(r.content, data)
620

    
621
    def test_upload_unprocessable_entity(self):
622
        cname = self.container
623
        oname = get_random_name()
624
        data = get_random_data()
625
        url = join_urls(self.pithos_path, self.user, cname, oname)
626
        r = self.put(url, data=data, HTTP_ETAG='123')
627
        self.assertEqual(r.status_code, 422)
628

    
629
#    def test_chunked_transfer(self):
630
#        cname = self.container
631
#        oname = '/%s' % get_random_name()
632
#        data = get_random_data()
633
#        url = join_urls(self.pithos_path, self.user, cname, oname)
634
#        r = self.put(url, data=data, HTTP_TRANSFER_ENCODING='chunked')
635
#        self.assertEqual(r.status_code, 201)
636
#        self.assertTrue('ETag' in r)
637
#        self.assertTrue('X-Object-Version' in r)
638

    
639
    def test_manifestation(self):
640
        cname = self.container
641
        prefix = 'myobject/'
642
        data = ''
643
        for i in range(random.randint(2, 10)):
644
            part = '%s%d' % (prefix, i)
645
            data += self.upload_object(cname, oname=part)[1]
646

    
647
        manifest = '%s/%s' % (cname, prefix)
648
        oname = get_random_name()
649
        url = join_urls(self.pithos_path, self.user, cname, oname)
650
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
651
        self.assertEqual(r.status_code, 201)
652

    
653
        # assert object exists
654
        r = self.get(url)
655
        self.assertEqual(r.status_code, 200)
656

    
657
        # assert its content
658
        self.assertEqual(r.content, data)
659

    
660
        # invalid manifestation
661
        invalid_manifestation = '%s/%s' % (cname, get_random_name())
662
        self.put(url, data='', HTTP_X_OBJECT_MANIFEST=invalid_manifestation)
663
        r = self.get(url)
664
        self.assertEqual(r.content, '')
665

    
666
    def test_create_zero_length_object(self):
667
        cname = self.container
668
        oname = get_random_name()
669
        url = join_urls(self.pithos_path, self.user, cname, oname)
670
        r = self.put(url, data='')
671
        self.assertEqual(r.status_code, 201)
672

    
673
        r = self.get(url)
674
        self.assertEqual(r.status_code, 200)
675
        self.assertEqual(int(r['Content-Length']), 0)
676
        self.assertEqual(r.content, '')
677

    
678
        r = self.get('%s?hashmap=&format=json' % url)
679
        self.assertEqual(r.status_code, 200)
680
        body = json.loads(r.content)
681
        hashes = body['hashes']
682
        hash = merkle('')
683
        self.assertEqual(hashes, [hash])
684

    
685
    def test_create_object_by_hashmap(self):
686
        cname = self.container
687
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
688

    
689
        # upload an object
690
        oname, data = self.upload_object(cname, length=block_size + 1)[:-1]
691
        # get it hashmap
692
        url = join_urls(self.pithos_path, self.user, cname, oname)
693
        r = self.get('%s?hashmap=&format=json' % url)
694

    
695
        oname = get_random_name()
696
        url = join_urls(self.pithos_path, self.user, cname, oname)
697
        r = self.put('%s?hashmap=' % url, data=r.content)
698
        self.assertEqual(r.status_code, 201)
699

    
700
        r = self.get(url)
701
        self.assertEqual(r.status_code, 200)
702
        self.assertEqual(r.content, data)
703

    
704

    
705
class ObjectCopy(PithosAPITest):
706
    def setUp(self):
707
        PithosAPITest.setUp(self)
708
        self.container = 'c1'
709
        self.create_container(self.container)
710
        self.object, self.data = self.upload_object(self.container)[:-1]
711

    
712
        url = join_urls(
713
            self.pithos_path, self.user, self.container, self.object)
714
        r = self.head(url)
715
        self.etag = r['X-Object-Hash']
716

    
717
    def test_copy(self):
718
        with AssertMappingInvariant(self.get_object_info, self.container,
719
                                    self.object):
720
            # copy object
721
            oname = get_random_name()
722
            url = join_urls(self.pithos_path, self.user, self.container, oname)
723
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
724
                         HTTP_X_COPY_FROM='/%s/%s' % (
725
                             self.container, self.object))
726

    
727
            # assert copy success
728
            self.assertEqual(r.status_code, 201)
729

    
730
            # assert access the new object
731
            r = self.head(url)
732
            self.assertEqual(r.status_code, 200)
733
            self.assertTrue('X-Object-Meta-Test' in r)
734
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
735

    
736
            # assert etag is the same
737
            self.assertTrue('X-Object-Hash' in r)
738
            self.assertEqual(r['X-Object-Hash'], self.etag)
739

    
740
    def test_copy_from_different_container(self):
741
        cname = 'c2'
742
        self.create_container(cname)
743
        with AssertMappingInvariant(self.get_object_info, self.container,
744
                                    self.object):
745
            oname = get_random_name()
746
            url = join_urls(self.pithos_path, self.user, cname, oname)
747
            r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
748
                         HTTP_X_COPY_FROM='/%s/%s' % (
749
                             self.container, self.object))
750

    
751
            # assert copy success
752
            self.assertEqual(r.status_code, 201)
753

    
754
            # assert access the new object
755
            r = self.head(url)
756
            self.assertEqual(r.status_code, 200)
757
            self.assertTrue('X-Object-Meta-Test' in r)
758
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
759

    
760
            # assert etag is the same
761
            self.assertTrue('X-Object-Hash' in r)
762
            self.assertEqual(r['X-Object-Hash'], self.etag)
763

    
764
    def test_copy_invalid(self):
765
        # copy from non-existent object
766
        oname = get_random_name()
767
        url = join_urls(self.pithos_path, self.user, self.container, oname)
768
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
769
                     HTTP_X_COPY_FROM='/%s/%s' % (
770
                         self.container, get_random_name()))
771
        self.assertEqual(r.status_code, 404)
772

    
773
        # copy from non-existent container
774
        oname = get_random_name()
775
        url = join_urls(self.pithos_path, self.user, self.container, oname)
776
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
777
                     HTTP_X_COPY_FROM='/%s/%s' % (
778
                         get_random_name(), self.object))
779
        self.assertEqual(r.status_code, 404)
780

    
781
    def test_copy_dir(self):
782
        folder = self.create_folder(self.container)[0]
783
        subfolder = self.create_folder(
784
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
785
        objects = [subfolder]
786
        append = objects.append
787
        append(self.upload_object(self.container,
788
                                  '%s/%s' % (folder, get_random_name()),
789
                                  depth='1')[0])
790
        append(self.upload_object(self.container,
791
                                  '%s/%s' % (subfolder, get_random_name()),
792
                                  depth='2')[0])
793
        other = self.upload_object(self.container, strnextling(folder))[0]
794

    
795
        # copy dir
796
        copy_folder = self.create_folder(self.container)[0]
797
        url = join_urls(self.pithos_path, self.user, self.container,
798
                        copy_folder)
799
        r = self.put('%s?delimiter=/' % url, data='',
800
                     HTTP_X_COPY_FROM='/%s/%s' % (self.container, folder))
801
        self.assertEqual(r.status_code, 201)
802

    
803
        for obj in objects:
804
            # assert object exists
805
            url = join_urls(self.pithos_path, self.user, self.container,
806
                            obj.replace(folder, copy_folder))
807
            r = self.head(url)
808
            self.assertEqual(r.status_code, 200)
809

    
810
            # assert metadata
811
            meta = self.get_object_meta(self.container, obj)
812
            for k in meta.keys():
813
                key = 'X-Object-Meta-%s' % k
814
                self.assertTrue(key in r)
815
                self.assertEqual(r[key], meta[k])
816

    
817
        # assert other has not been created under copy folder
818
        url = join_urls(self.pithos_path, self.user, self.container,
819
                        '%s/%s' % (copy_folder,
820
                                   other.replace(folder, copy_folder)))
821
        r = self.head(url)
822
        self.assertEqual(r.status_code, 404)
823

    
824

    
825
class ObjectMove(PithosAPITest):
826
    def setUp(self):
827
        PithosAPITest.setUp(self)
828
        self.container = 'c1'
829
        self.create_container(self.container)
830
        self.object, self.data = self.upload_object(self.container)[:-1]
831

    
832
        url = join_urls(
833
            self.pithos_path, self.user, self.container, self.object)
834
        r = self.head(url)
835
        self.etag = r['X-Object-Hash']
836

    
837
    def test_move(self):
838
        # move object
839
        oname = get_random_name()
840
        url = join_urls(self.pithos_path, self.user, self.container, oname)
841
        r = self.put(url, data='', HTTP_X_OBJECT_META_TEST='testcopy',
842
                     HTTP_X_MOVE_FROM='/%s/%s' % (
843
                         self.container, self.object))
844

    
845
        # assert move success
846
        self.assertEqual(r.status_code, 201)
847

    
848
        # assert access the new object
849
        r = self.head(url)
850
        self.assertEqual(r.status_code, 200)
851
        self.assertTrue('X-Object-Meta-Test' in r)
852
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
853

    
854
        # assert etag is the same
855
        self.assertTrue('X-Object-Hash' in r)
856

    
857
        # assert the initial object has been deleted
858
        url = join_urls(self.pithos_path, self.user, self.container,
859
                        self.object)
860
        r = self.head(url)
861
        self.assertEqual(r.status_code, 404)
862

    
863
    def test_move_dir(self):
864
        folder = self.create_folder(self.container)[0]
865
        subfolder = self.create_folder(
866
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
867
        objects = [subfolder]
868
        append = objects.append
869
        append(self.upload_object(self.container,
870
                                  '%s/%s' % (folder, get_random_name()),
871
                                  depth='1')[0])
872
        append(self.upload_object(self.container,
873
                                  '%s/%s' % (subfolder, get_random_name()),
874
                                  depth='1')[0])
875
        other = self.upload_object(self.container, strnextling(folder))[0]
876

    
877
        # move dir
878
        copy_folder = self.create_folder(self.container)[0]
879
        url = join_urls(self.pithos_path, self.user, self.container,
880
                        copy_folder)
881
        r = self.put('%s?delimiter=/' % url, data='',
882
                     HTTP_X_MOVE_FROM='/%s/%s' % (self.container, folder))
883
        self.assertEqual(r.status_code, 201)
884

    
885
        for obj in objects:
886
            # assert initial object does not exist
887
            url = join_urls(self.pithos_path, self.user, self.container, obj)
888
            r = self.head(url)
889
            self.assertEqual(r.status_code, 404)
890

    
891
            # assert new object was created
892
            url = join_urls(self.pithos_path, self.user, self.container,
893
                            obj.replace(folder, copy_folder))
894
            r = self.head(url)
895
            self.assertEqual(r.status_code, 200)
896

    
897
        # assert other has not been created under copy folder
898
        url = join_urls(self.pithos_path, self.user, self.container,
899
                        '%s/%s' % (copy_folder,
900
                                   other.replace(folder, copy_folder)))
901
        r = self.head(url)
902
        self.assertEqual(r.status_code, 404)
903

    
904

    
905
class ObjectPost(PithosAPITest):
906
    def setUp(self):
907
        PithosAPITest.setUp(self)
908
        self.container = 'c1'
909
        self.create_container(self.container)
910
        self.object, self.object_data = self.upload_object(self.container)[:2]
911

    
912
    def test_update_meta(self):
913
        with AssertUUidInvariant(self.get_object_info,
914
                                 self.container,
915
                                 self.object):
916
            # update metadata
917
            d = {'a' * 114: 'b' * 256}
918
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
919
                          k, v in d.items())
920
            url = join_urls(self.pithos_path, self.user, self.container,
921
                            self.object)
922
            r = self.post(url, content_type='', **kwargs)
923
            self.assertEqual(r.status_code, 202)
924

    
925
            # assert metadata have been updated
926
            meta = self.get_object_meta(self.container, self.object)
927

    
928
            for k, v in d.items():
929
                self.assertTrue(k.title() in meta)
930
                self.assertTrue(meta[k.title()], v)
931

    
932
            # Header key too large
933
            d = {'a' * 115: 'b' * 256}
934
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
935
                          k, v in d.items())
936
            r = self.post(url, content_type='', **kwargs)
937
            self.assertEqual(r.status_code, 400)
938

    
939
            # Header value too large
940
            d = {'a' * 114: 'b' * 257}
941
            kwargs = dict(('HTTP_X_OBJECT_META_%s' % k, v) for
942
                          k, v in d.items())
943
            r = self.post(url, content_type='', **kwargs)
944
            self.assertEqual(r.status_code, 400)
945

    
946
#            # Check utf-8 meta
947
#            d = {'α' * (114 / 2): 'β' * (256 / 2)}
948
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
949
#                          k, v in d.items())
950
#            url = join_urls(self.pithos_path, self.user, self.container,
951
#                            self.object)
952
#            r = self.post(url, content_type='', **kwargs)
953
#            self.assertEqual(r.status_code, 202)
954
#
955
#            # assert metadata have been updated
956
#            meta = self.get_object_meta(self.container, self.object)
957
#
958
#            for k, v in d.items():
959
#                key = 'X-Object-Meta-%s' % k.title()
960
#                self.assertTrue(key in meta)
961
#                self.assertTrue(meta[key], v)
962
#
963
#            # Header key too large
964
#            d = {'α' * 114: 'β' * (256 / 2)}
965
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
966
#                          k, v in d.items())
967
#            r = self.post(url, content_type='', **kwargs)
968
#            self.assertEqual(r.status_code, 400)
969
#
970
#            # Header value too large
971
#            d = {'α' * 114: 'β' * 256}
972
#            kwargs = dict(('HTTP_X_OBJECT_META_%s' % quote(k), quote(v)) for
973
#                          k, v in d.items())
974
#            r = self.udpate(url, content_type='', **kwargs)
975
#            self.assertEqual(r.status_code, 400)
976

    
977
    def test_update_object(self):
978
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
979
        oname, odata = self.upload_object(
980
            self.container, length=random.randint(
981
                block_size + 1, 2 * block_size))[:2]
982

    
983
        length = len(odata)
984
        first_byte_pos = random.randint(1, block_size)
985
        last_byte_pos = random.randint(block_size + 1, length - 1)
986
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
987
        kwargs = {'content_type': 'application/octet-stream',
988
                  'HTTP_CONTENT_RANGE': range}
989

    
990
        url = join_urls(self.pithos_path, self.user, self.container, oname)
991
        partial = last_byte_pos - first_byte_pos + 1
992
        data = get_random_data(partial)
993
        r = self.post(url, data=data, **kwargs)
994

    
995
        self.assertEqual(r.status_code, 204)
996
        self.assertTrue('ETag' in r)
997
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
998
                                     data)
999
        if pithos_settings.UPDATE_MD5:
1000
            etag = md5_hash(updated_data)
1001
        else:
1002
            etag = merkle(updated_data)
1003
        #self.assertEqual(r['ETag'], etag)
1004

    
1005
        # check modified object
1006
        r = self.get(url)
1007

    
1008
        self.assertEqual(r.status_code, 200)
1009
        self.assertEqual(r.content, updated_data)
1010
        self.assertEqual(etag, r['ETag'])
1011

    
1012
    def test_update_object_divided_by_blocksize(self):
1013
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1014
        oname, odata = self.upload_object(self.container,
1015
                                          length=2 * block_size)[:2]
1016

    
1017
        length = len(odata)
1018
        first_byte_pos = block_size
1019
        last_byte_pos = 2 * block_size - 1
1020
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1021
        kwargs = {'content_type': 'application/octet-stream',
1022
                  'HTTP_CONTENT_RANGE': range}
1023

    
1024
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1025
        partial = last_byte_pos - first_byte_pos + 1
1026
        data = get_random_data(partial)
1027
        r = self.post(url, data=data, **kwargs)
1028

    
1029
        self.assertEqual(r.status_code, 204)
1030
        self.assertTrue('ETag' in r)
1031
        updated_data = odata.replace(odata[first_byte_pos: last_byte_pos + 1],
1032
                                     data)
1033
        if pithos_settings.UPDATE_MD5:
1034
            etag = md5_hash(updated_data)
1035
        else:
1036
            etag = merkle(updated_data)
1037
        #self.assertEqual(r['ETag'], etag)
1038

    
1039
        # check modified object
1040
        r = self.get(url)
1041

    
1042
        self.assertEqual(r.status_code, 200)
1043
        self.assertEqual(r.content, updated_data)
1044
        self.assertEqual(etag, r['ETag'])
1045

    
1046
    def test_update_object_invalid_content_length(self):
1047
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1048
        oname, odata = self.upload_object(
1049
            self.container, length=random.randint(
1050
                block_size + 1, 2 * block_size))[:2]
1051

    
1052
        length = len(odata)
1053
        first_byte_pos = random.randint(1, block_size)
1054
        last_byte_pos = random.randint(block_size + 1, length - 1)
1055
        partial = last_byte_pos - first_byte_pos + 1
1056
        data = get_random_data(partial)
1057
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1058
        kwargs = {'content_type': 'application/octet-stream',
1059
                  'HTTP_CONTENT_RANGE': range,
1060
                  'CONTENT_LENGTH': str(partial + 1)}
1061

    
1062
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1063
        r = self.post(url, data=data, **kwargs)
1064

    
1065
        self.assertEqual(r.status_code, 400)
1066

    
1067
    def test_update_object_invalid_range(self):
1068
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1069
        oname, odata = self.upload_object(
1070
            self.container, length=random.randint(block_size + 1,
1071
                                                  2 * block_size))[:2]
1072

    
1073
        length = len(odata)
1074
        first_byte_pos = random.randint(1, block_size)
1075
        last_byte_pos = first_byte_pos - 1
1076
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1077
        kwargs = {'content_type': 'application/octet-stream',
1078
                  'HTTP_CONTENT_RANGE': range}
1079

    
1080
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1081
        r = self.post(url, data=get_random_data(), **kwargs)
1082

    
1083
        self.assertEqual(r.status_code, 416)
1084

    
1085
    def test_update_object_out_of_limits(self):
1086
        block_size = pithos_settings.BACKEND_BLOCK_SIZE
1087
        oname, odata = self.upload_object(
1088
            self.container, length=random.randint(block_size + 1,
1089
                                                  2 * block_size))[:2]
1090

    
1091
        length = len(odata)
1092
        first_byte_pos = random.randint(1, block_size)
1093
        last_byte_pos = length + 1
1094
        range = 'bytes %s-%s/%s' % (first_byte_pos, last_byte_pos, length)
1095
        kwargs = {'content_type': 'application/octet-stream',
1096
                  'HTTP_CONTENT_RANGE': range}
1097

    
1098
        url = join_urls(self.pithos_path, self.user, self.container, oname)
1099
        r = self.post(url, data=get_random_data(), **kwargs)
1100

    
1101
        self.assertEqual(r.status_code, 416)
1102

    
1103
    def test_append(self):
1104
        data = get_random_data()
1105
        length = len(data)
1106
        url = join_urls(self.pithos_path, self.user, self.container,
1107
                        self.object)
1108
        r = self.post(url, data=data, content_type='application/octet-stream',
1109
                      HTTP_CONTENT_LENGTH=str(length),
1110
                      HTTP_CONTENT_RANGE='bytes */*')
1111
        self.assertEqual(r.status_code, 204)
1112

    
1113
        r = self.get(url)
1114
        content = r.content
1115
        self.assertEqual(len(content), len(self.object_data) + length)
1116
        self.assertEqual(content, self.object_data + data)
1117

    
1118
    # TODO Fix the test
1119
    def _test_update_with_chunked_transfer(self):
1120
        data = get_random_data()
1121
        length = len(data)
1122

    
1123
        url = join_urls(self.pithos_path, self.user, self.container,
1124
                        self.object)
1125
        r = self.post(url, data=data, content_type='application/octet-stream',
1126
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1127
                      HTTP_TRANSFER_ENCODING='chunked')
1128
        self.assertEqual(r.status_code, 204)
1129

    
1130
        # check modified object
1131
        r = self.get(url)
1132
        content = r.content
1133
        self.assertEqual(content[0:length], data)
1134
        self.assertEqual(content[length:], self.object_data[length:])
1135

    
1136
    def test_update_from_other_object(self):
1137
        src = self.object
1138
        dest = get_random_name()
1139

    
1140
        url = join_urls(self.pithos_path, self.user, self.container, src)
1141
        r = self.get(url)
1142
        source_data = r.content
1143
        source_meta = self.get_object_info(self.container, src)
1144

    
1145
        # update zero length object
1146
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1147
        r = self.put(url, data='')
1148
        self.assertEqual(r.status_code, 201)
1149

    
1150
        r = self.post(url,
1151
                      HTTP_CONTENT_RANGE='bytes */*',
1152
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1153
        self.assertEqual(r.status_code, 204)
1154

    
1155
        r = self.get(url)
1156
        dest_data = r.content
1157
        dest_meta = self.get_object_info(self.container, dest)
1158

    
1159
        self.assertEqual(source_data, dest_data)
1160
        #self.assertEqual(source_meta['ETag'], dest_meta['ETag'])
1161
        self.assertEqual(source_meta['X-Object-Hash'],
1162
                         dest_meta['X-Object-Hash'])
1163
        self.assertTrue(
1164
            source_meta['X-Object-UUID'] != dest_meta['X-Object-UUID'])
1165

    
1166
    def test_update_range_from_other_object(self):
1167
        src = self.object
1168
        dest = get_random_name()
1169

    
1170
        url = join_urls(self.pithos_path, self.user, self.container, src)
1171
        r = self.get(url)
1172
        source_data = r.content
1173

    
1174
        # update zero length object
1175
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1176
        initial_data = get_random_data()
1177
        length = len(initial_data)
1178
        r = self.put(url, data=initial_data)
1179
        self.assertEqual(r.status_code, 201)
1180

    
1181
        offset = random.randint(1, length - 2)
1182
        upto = random.randint(offset, length - 1)
1183
        r = self.post(url,
1184
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1185
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container, src))
1186
        self.assertEqual(r.status_code, 204)
1187

    
1188
        r = self.get(url)
1189
        content = r.content
1190
        self.assertEqual(content, (initial_data[:offset] +
1191
                                   source_data[:upto - offset + 1] +
1192
                                   initial_data[upto + 1:]))
1193

    
1194
    def test_update_range_from_invalid_other_object(self):
1195
        src = self.object
1196
        dest = get_random_name()
1197

    
1198
        url = join_urls(self.pithos_path, self.user, self.container, src)
1199
        r = self.get(url)
1200

    
1201
        # update zero length object
1202
        url = join_urls(self.pithos_path, self.user, self.container, dest)
1203
        initial_data = get_random_data()
1204
        length = len(initial_data)
1205
        r = self.put(url, data=initial_data)
1206
        self.assertEqual(r.status_code, 201)
1207

    
1208
        offset = random.randint(1, length - 2)
1209
        upto = random.randint(offset, length - 1)
1210

    
1211
        # source object does not start with /
1212
        r = self.post(url,
1213
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1214
                      HTTP_X_SOURCE_OBJECT='%s/%s' % (self.container, src))
1215
        self.assertEqual(r.status_code, 400)
1216

    
1217
        # source object does not exist
1218
        r = self.post(url,
1219
                      HTTP_CONTENT_RANGE='bytes %s-%s/*' % (offset, upto),
1220
                      HTTP_X_SOURCE_OBJECT='/%s/%s1' % (self.container, src))
1221
        self.assertEqual(r.status_code, 404)
1222

    
1223
    def test_restore_version(self):
1224
        info = self.get_object_info(self.container, self.object)
1225
        v = []
1226
        append = v.append
1227
        append((info['X-Object-Version'],
1228
                int(info['Content-Length']),
1229
                self.object_data))
1230

    
1231
        # update object
1232
        data, r = self.upload_object(self.container, self.object,
1233
                                     length=v[0][1] - 1)[1:]
1234
        self.assertTrue('X-Object-Version' in r)
1235
        append((r['X-Object-Version'], len(data), data))
1236
        # v[0][1] > v[1][1]
1237

    
1238
        # update with the previous version
1239
        url = join_urls(self.pithos_path, self.user, self.container,
1240
                        self.object)
1241
        r = self.post(url,
1242
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1243
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1244
                                                       self.object),
1245
                      HTTP_X_SOURCE_VERSION=v[0][0],
1246
                      HTTP_X_OBJECT_BYTES=str(v[0][1]))
1247
        self.assertEqual(r.status_code, 204)
1248
        # v[2][1] = v[0][1] > v[1][1]
1249

    
1250
        # check content
1251
        r = self.get(url)
1252
        content = r.content
1253
        self.assertEqual(len(content), v[0][1])
1254
        self.assertEqual(content, self.object_data)
1255
        append((r['X-Object-Version'], len(content), content))
1256

    
1257
        # update object content(v4) > content(v2)
1258
        data, r = self.upload_object(self.container, self.object,
1259
                                     length=v[2][1] + 1)[1:]
1260
        self.assertTrue('X-Object-Version' in r)
1261
        append((r['X-Object-Version'], len(data), data))
1262
        # v[3][1] > v[2][1] = v[0][1] > v[1][1]
1263

    
1264
        # update with the previous version
1265
        r = self.post(url,
1266
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1267
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1268
                                                       self.object),
1269
                      HTTP_X_SOURCE_VERSION=v[2][0],
1270
                      HTTP_X_OBJECT_BYTES=str(v[2][1]))
1271
        self.assertEqual(r.status_code, 204)
1272
        # v[3][1] > v[4][1] = v[2][1] = v[0][1] > v[1][1]
1273

    
1274
        # check content
1275
        r = self.get(url)
1276
        data = r.content
1277
        self.assertEqual(data, v[2][2])
1278
        append((r['X-Object-Version'], len(data), data))
1279

    
1280

    
1281
class ObjectDelete(PithosAPITest):
1282
    def setUp(self):
1283
        PithosAPITest.setUp(self)
1284
        self.container = 'c1'
1285
        self.create_container(self.container)
1286
        self.object, self.object_data = self.upload_object(self.container)[:2]
1287

    
1288
    def test_delete(self):
1289
        url = join_urls(self.pithos_path, self.user, self.container,
1290
                        self.object)
1291
        r = self.delete(url)
1292
        self.assertEqual(r.status_code, 204)
1293

    
1294
        r = self.head(url)
1295
        self.assertEqual(r.status_code, 404)
1296

    
1297
    def test_delete_non_existent(self):
1298
        url = join_urls(self.pithos_path, self.user, self.container,
1299
                        get_random_name())
1300
        r = self.delete(url)
1301
        self.assertEqual(r.status_code, 404)
1302

    
1303
    def test_delete_dir(self):
1304
        folder = self.create_folder(self.container)[0]
1305
        subfolder = self.create_folder(
1306
            self.container, oname='%s/%s' % (folder, get_random_name()))[0]
1307
        objects = [subfolder]
1308
        append = objects.append
1309
        append(self.upload_object(self.container,
1310
                                  '%s/%s' % (folder, get_random_name()),
1311
                                  depth='1')[0])
1312
        append(self.upload_object(self.container,
1313
                                  '%s/%s' % (subfolder, get_random_name()),
1314
                                  depth='2')[0])
1315
        other = self.upload_object(self.container, strnextling(folder))[0]
1316

    
1317
        # move dir
1318
        url = join_urls(self.pithos_path, self.user, self.container, folder)
1319
        r = self.delete('%s?delimiter=/' % url)
1320
        self.assertEqual(r.status_code, 204)
1321

    
1322
        for obj in objects:
1323
            # assert object does not exist
1324
            url = join_urls(self.pithos_path, self.user, self.container, obj)
1325
            r = self.head(url)
1326
            self.assertEqual(r.status_code, 404)
1327

    
1328
        # assert other has not been deleted
1329
        url = join_urls(self.pithos_path, self.user, self.container, other)
1330
        r = self.head(url)
1331
        self.assertEqual(r.status_code, 200)