Revision efb1f3d3

b/snf-pithos-app/pithos/api/public.py
37 37
from snf_django.lib import api
38 38
from snf_django.lib.api import faults
39 39

  
40
from pithos.api.settings import UNSAFE_DOMAIN
40
from pithos.api.settings import UNSAFE_DOMAIN, UPDATE_MD5
41 41
from pithos.api.util import (put_object_headers, update_manifest_meta,
42 42
                             validate_modification_preconditions,
43 43
                             validate_matching_preconditions,
......
123 123
        validate_matching_preconditions(request, meta)
124 124
    except faults.NotModified:
125 125
        response = HttpResponse(status=304)
126
        response['ETag'] = meta['ETag']
126
        response['ETag'] = meta['hash'] if not UPDATE_MD5 else meta['checksum']
127 127
        return response
128 128

  
129 129
    sizes = []
b/snf-pithos-app/pithos/api/test/public.py
34 34
import random
35 35
import datetime
36 36
import time as _time
37
import re
38

  
39
from functools import partial
37 40

  
38 41
from synnefo.lib import join_urls
39 42

  
40 43
import django.utils.simplejson as json
41 44

  
42
from pithos.api.test import PithosAPITest
43
from pithos.api.test.util import get_random_name
45
from pithos.api.test import (PithosAPITest, DATE_FORMATS, TEST_BLOCK_SIZE,
46
                             TEST_HASH_ALGORITHM)
47

  
48
from pithos.api.test.util import (get_random_name, get_random_data, md5_hash,
49
                                  merkle)
44 50
from pithos.api import settings as pithos_settings
45 51

  
52
merkle = partial(merkle,
53
                 blocksize=TEST_BLOCK_SIZE,
54
                 blockhash=TEST_HASH_ALGORITHM)
55

  
46 56

  
47 57
class TestPublic(PithosAPITest):
48 58
    def _assert_not_public_object(self, cname, oname):
......
249 259
        self.assertTrue(r.content, data)
250 260
        #self.assertTrue('X-Object-Manifest' in r)
251 261
        #self.assertEqual(r['X-Object-Manifest'], manifest)
262

  
263
    def test_public_get_partial(self):
264
        cname = self.create_container()[0]
265
        oname, odata = self.upload_object(cname, length=512)[:-1]
266

  
267
        # set public
268
        url = join_urls(self.pithos_path, self.user, cname, oname)
269
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
270
        self.assertEqual(r.status_code, 202)
271

  
272
        info = self.get_object_info(cname, oname)
273
        public_url = info['X-Object-Public']
274

  
275
        r = self.get(public_url, HTTP_RANGE='bytes=0-499')
276
        self.assertEqual(r.status_code, 206)
277
        data = r.content
278
        self.assertEqual(data, odata[:500])
279
        self.assertTrue('Content-Range' in r)
280
        self.assertEqual(r['Content-Range'], 'bytes 0-499/%s' % len(odata))
281
        self.assertTrue('Content-Type' in r)
282
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
283

  
284
    def test_public_get_final_500(self):
285
        cname = self.create_container()[0]
286
        oname, odata = self.upload_object(cname, length=512)[:-1]
287
        size = len(odata)
288

  
289
        # set public
290
        url = join_urls(self.pithos_path, self.user, cname, oname)
291
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
292
        self.assertEqual(r.status_code, 202)
293

  
294
        info = self.get_object_info(cname, oname)
295
        public_url = info['X-Object-Public']
296

  
297
        r = self.get(public_url, HTTP_RANGE='bytes=-500')
298
        self.assertEqual(r.status_code, 206)
299
        self.assertEqual(r.content, odata[-500:])
300
        self.assertTrue('Content-Range' in r)
301
        self.assertEqual(r['Content-Range'],
302
                         'bytes %s-%s/%s' % (size - 500, size - 1, size))
303
        self.assertTrue('Content-Type' in r)
304
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
305

  
306
    def test_public_get_rest(self):
307
        cname = self.create_container()[0]
308
        oname, odata = self.upload_object(cname, length=512)[:-1]
309
        size = len(odata)
310
        offset = len(odata) - random.randint(1, 512)
311

  
312
        # set public
313
        url = join_urls(self.pithos_path, self.user, cname, oname)
314
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
315
        self.assertEqual(r.status_code, 202)
316

  
317
        info = self.get_object_info(cname, oname)
318
        public_url = info['X-Object-Public']
319

  
320
        r = self.get(public_url, HTTP_RANGE='bytes=%s-' % offset)
321
        self.assertEqual(r.status_code, 206)
322
        self.assertEqual(r.content, odata[offset:])
323
        self.assertTrue('Content-Range' in r)
324
        self.assertEqual(r['Content-Range'],
325
                         'bytes %s-%s/%s' % (offset, size - 1, size))
326
        self.assertTrue('Content-Type' in r)
327
        self.assertTrue(r['Content-Type'], 'application/octet-stream')
328

  
329
    def test_public_get_range_not_satisfiable(self):
330
        cname = self.create_container()[0]
331
        oname, odata = self.upload_object(cname, length=512)[:-1]
332
        url = join_urls(self.pithos_path, self.user, cname, oname)
333

  
334
        offset = len(odata) + 1
335

  
336
        # set public
337
        url = join_urls(self.pithos_path, self.user, cname, oname)
338
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
339
        self.assertEqual(r.status_code, 202)
340

  
341
        info = self.get_object_info(cname, oname)
342
        public_url = info['X-Object-Public']
343

  
344
        r = self.get(public_url, HTTP_RANGE='bytes=0-%s' % offset)
345
        self.assertEqual(r.status_code, 416)
346

  
347
    def test_public_multiple_range(self):
348
        cname = self.create_container()[0]
349
        oname, odata = self.upload_object(cname)[:-1]
350
        url = join_urls(self.pithos_path, self.user, cname, oname)
351

  
352
        # set public
353
        url = join_urls(self.pithos_path, self.user, cname, oname)
354
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
355
        self.assertEqual(r.status_code, 202)
356

  
357
        info = self.get_object_info(cname, oname)
358
        public_url = info['X-Object-Public']
359

  
360
        l = ['0-499', '-500', '1000-']
361
        ranges = 'bytes=%s' % ','.join(l)
362
        r = self.get(public_url, HTTP_RANGE=ranges)
363
        self.assertEqual(r.status_code, 206)
364
        self.assertTrue('content-type' in r)
365
        p = re.compile(
366
            'multipart/byteranges; boundary=(?P<boundary>[0-9a-f]{32}\Z)',
367
            re.I)
368
        m = p.match(r['content-type'])
369
        if m is None:
370
            self.fail('Invalid multiple range content type')
371
        boundary = m.groupdict()['boundary']
372
        cparts = r.content.split('--%s' % boundary)[1:-1]
373

  
374
        # assert content parts length
375
        self.assertEqual(len(cparts), len(l))
376

  
377
        # for each content part assert headers
378
        i = 0
379
        for cpart in cparts:
380
            content = cpart.split('\r\n')
381
            headers = content[1:3]
382
            content_range = headers[0].split(': ')
383
            self.assertEqual(content_range[0], 'Content-Range')
384

  
385
            r = l[i].split('-')
386
            if not r[0] and not r[1]:
387
                pass
388
            elif not r[0]:
389
                start = len(odata) - int(r[1])
390
                end = len(odata)
391
            elif not r[1]:
392
                start = int(r[0])
393
                end = len(odata)
394
            else:
395
                start = int(r[0])
396
                end = int(r[1]) + 1
397
            fdata = odata[start:end]
398
            sdata = '\r\n'.join(content[4:-1])
399
            self.assertEqual(len(fdata), len(sdata))
400
            self.assertEquals(fdata, sdata)
401
            i += 1
402

  
403
    def test_public_multiple_range_not_satisfiable(self):
404
        # perform get with multiple range
405
        cname = self.create_container()[0]
406
        oname, odata = self.upload_object(cname)[:-1]
407

  
408
        # set public
409
        url = join_urls(self.pithos_path, self.user, cname, oname)
410
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
411
        self.assertEqual(r.status_code, 202)
412

  
413
        info = self.get_object_info(cname, oname)
414
        public_url = info['X-Object-Public']
415

  
416
        out_of_range = len(odata) + 1
417
        l = ['0-499', '-500', '%d-' % out_of_range]
418
        ranges = 'bytes=%s' % ','.join(l)
419
        r = self.get(public_url, HTTP_RANGE=ranges)
420
        self.assertEqual(r.status_code, 416)
421

  
422
    def test_public_get_if_match(self):
423
        cname = self.create_container()[0]
424
        oname, odata = self.upload_object(cname)[:-1]
425

  
426
        # set public
427
        url = join_urls(self.pithos_path, self.user, cname, oname)
428
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
429
        self.assertEqual(r.status_code, 202)
430

  
431
        info = self.get_object_info(cname, oname)
432
        public_url = info['X-Object-Public']
433

  
434
        def assert_matches(etag):
435
            r = self.get(public_url, HTTP_IF_MATCH=etag)
436

  
437
            # assert get success
438
            self.assertEqual(r.status_code, 200)
439

  
440
            # assert response content
441
            self.assertEqual(r.content, odata)
442

  
443
        # perform get with If-Match
444
        if pithos_settings.UPDATE_MD5:
445
            assert_matches(md5_hash(odata))
446
        else:
447
            assert_matches(merkle(odata))
448

  
449
    def test_public_get_if_match_star(self):
450
        cname = self.create_container()[0]
451
        oname, odata = self.upload_object(cname)[:-1]
452

  
453
        # set public
454
        url = join_urls(self.pithos_path, self.user, cname, oname)
455
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
456
        self.assertEqual(r.status_code, 202)
457

  
458
        info = self.get_object_info(cname, oname)
459
        public_url = info['X-Object-Public']
460

  
461
        # perform get with If-Match *
462
        r = self.get(public_url, HTTP_IF_MATCH='*')
463

  
464
        # assert get success
465
        self.assertEqual(r.status_code, 200)
466

  
467
        # assert response content
468
        self.assertEqual(r.content, odata)
469

  
470
    def test_public_get_multiple_if_match(self):
471
        cname = self.create_container()[0]
472
        oname, odata = self.upload_object(cname)[:-1]
473

  
474
        # set public
475
        url = join_urls(self.pithos_path, self.user, cname, oname)
476
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
477
        self.assertEqual(r.status_code, 202)
478

  
479
        info = self.get_object_info(cname, oname)
480
        public_url = info['X-Object-Public']
481

  
482
        def assert_multiple_match(etag):
483
            quoted = lambda s: '"%s"' % s
484
            r = self.get(public_url, HTTP_IF_MATCH=','.join(
485
                [quoted(etag), quoted(get_random_data(64))]))
486

  
487
            # assert get success
488
            self.assertEqual(r.status_code, 200)
489

  
490
            # assert response content
491
            self.assertEqual(r.content, odata)
492

  
493
        # perform get with If-Match
494
        if pithos_settings.UPDATE_MD5:
495
            assert_multiple_match(md5_hash(odata))
496
        else:
497
            assert_multiple_match(merkle(odata))
498

  
499
    def test_public_if_match_precondition_failed(self):
500
        cname = self.create_container()[0]
501
        oname, odata = self.upload_object(cname)[:-1]
502

  
503
        # set public
504
        url = join_urls(self.pithos_path, self.user, cname, oname)
505
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
506
        self.assertEqual(r.status_code, 202)
507

  
508
        info = self.get_object_info(cname, oname)
509
        public_url = info['X-Object-Public']
510

  
511
        # perform get with If-Match
512
        r = self.get(public_url, HTTP_IF_MATCH=get_random_name())
513
        self.assertEqual(r.status_code, 412)
514

  
515
    def test_public_if_none_match(self):
516
        # upload object
517
        cname = self.create_container()[0]
518
        oname, odata = self.upload_object(cname)[:-1]
519

  
520
        # set public
521
        url = join_urls(self.pithos_path, self.user, cname, oname)
522
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
523
        self.assertEqual(r.status_code, 202)
524

  
525
        info = self.get_object_info(cname, oname)
526
        public_url = info['X-Object-Public']
527

  
528
        def assert_non_match(etag):
529
            # perform get with If-None-Match
530
            r = self.get(public_url, HTTP_IF_NONE_MATCH=etag)
531

  
532
            # assert precondition_failed
533
            self.assertEqual(r.status_code, 304)
534

  
535
            # update object data
536
            r = self.append_object_data(cname, oname)[-1]
537
            self.assertTrue(etag != r['ETag'])
538

  
539
            # perform get with If-None-Match
540
            r = self.get(public_url, HTTP_IF_NONE_MATCH=etag)
541

  
542
            # assert get success
543
            self.assertEqual(r.status_code, 200)
544

  
545
        if pithos_settings.UPDATE_MD5:
546
            assert_non_match(md5_hash(odata))
547
        else:
548
            assert_non_match(merkle(odata))
549

  
550
    def test_public_if_none_match_star(self):
551
        # upload object
552
        cname = self.create_container()[0]
553
        oname, odata = self.upload_object(cname)[:-1]
554

  
555
        # set public
556
        url = join_urls(self.pithos_path, self.user, cname, oname)
557
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
558
        self.assertEqual(r.status_code, 202)
559

  
560
        info = self.get_object_info(cname, oname)
561
        public_url = info['X-Object-Public']
562

  
563
        # perform get with If-None-Match with star
564
        r = self.get(public_url, HTTP_IF_NONE_MATCH='*')
565

  
566
        self.assertEqual(r.status_code, 304)
567

  
568
    def test_public_if_modified_sinse(self):
569
        cname = get_random_name()
570
        self.create_container(cname)
571
        oname, odata = self.upload_object(cname)[:-1]
572
        self._assert_not_public_object(cname, oname)
573

  
574
        # set public
575
        url = join_urls(self.pithos_path, self.user, cname, oname)
576
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
577
        self.assertEqual(r.status_code, 202)
578

  
579
        info = self.get_object_info(cname, oname)
580
        public = info['X-Object-Public']
581

  
582
        object_info = self.get_object_info(cname, oname)
583
        last_modified = object_info['Last-Modified']
584
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
585
        t1_formats = map(t1.strftime, DATE_FORMATS)
586

  
587
        for t in t1_formats:
588
            r = self.get(public, user='user2', HTTP_IF_MODIFIED_SINCE=t,
589
                         token=None)
590
            self.assertEqual(r.status_code, 304)
591

  
592
        _time.sleep(1)
593

  
594
        # update object data
595
        appended_data = self.append_object_data(cname, oname)[1]
596

  
597
        # Check modified since
598
        for t in t1_formats:
599
            r = self.get(public, user='user2', HTTP_IF_MODIFIED_SINCE=t,
600
                         token=None)
601
            self.assertEqual(r.status_code, 200)
602
            self.assertEqual(r.content, odata + appended_data)
603

  
604
    def test_public_if_modified_since_invalid_date(self):
605
        cname = self.create_container()[0]
606
        oname, odata = self.upload_object(cname)[:-1]
607

  
608
        # set public
609
        url = join_urls(self.pithos_path, self.user, cname, oname)
610
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
611
        self.assertEqual(r.status_code, 202)
612

  
613
        info = self.get_object_info(cname, oname)
614
        public_url = info['X-Object-Public']
615

  
616
        r = self.get(public_url, HTTP_IF_MODIFIED_SINCE='Monday')
617
        self.assertEqual(r.status_code, 200)
618
        self.assertEqual(r.content, odata)
619

  
620
    def test_public_if_public_not_modified_since(self):
621
        cname = self.create_container()[0]
622
        oname, odata = self.upload_object(cname)[:-1]
623

  
624
        # set public
625
        url = join_urls(self.pithos_path, self.user, cname, oname)
626
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
627
        self.assertEqual(r.status_code, 202)
628

  
629
        info = self.get_object_info(cname, oname)
630
        public_url = info['X-Object-Public']
631
        last_modified = info['Last-Modified']
632
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
633

  
634
        # Check unmodified
635
        t1 = t + datetime.timedelta(seconds=1)
636
        t1_formats = map(t1.strftime, DATE_FORMATS)
637
        for t in t1_formats:
638
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
639
            self.assertEqual(r.status_code, 200)
640
            self.assertEqual(r.content, odata)
641

  
642
        # modify object
643
        _time.sleep(2)
644
        self.append_object_data(cname, oname)
645

  
646
        info = self.get_object_info(cname, oname)
647
        last_modified = info['Last-Modified']
648
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
649
        t2 = t - datetime.timedelta(seconds=1)
650
        t2_formats = map(t2.strftime, DATE_FORMATS)
651

  
652
        # check modified
653
        for t in t2_formats:
654
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
655
            self.assertEqual(r.status_code, 412)
656

  
657
        # modify account: update object meta
658
        _time.sleep(1)
659
        self.update_object_meta(cname, oname, {'foo': 'bar'})
660

  
661
        info = self.get_object_info(cname, oname)
662
        last_modified = info['Last-Modified']
663
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
664
        t3 = t - datetime.timedelta(seconds=1)
665
        t3_formats = map(t3.strftime, DATE_FORMATS)
666

  
667
        # check modified
668
        for t in t3_formats:
669
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=t)
670
            self.assertEqual(r.status_code, 412)
671

  
672
    def test_public_if_unmodified_since(self):
673
        cname = self.create_container()[0]
674
        oname, odata = self.upload_object(cname)[:-1]
675

  
676
        # set public
677
        url = join_urls(self.pithos_path, self.user, cname, oname)
678
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
679
        self.assertEqual(r.status_code, 202)
680

  
681
        info = self.get_object_info(cname, oname)
682
        public_url = info['X-Object-Public']
683
        last_modified = info['Last-Modified']
684
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
685
        t = t + datetime.timedelta(seconds=1)
686
        t_formats = map(t.strftime, DATE_FORMATS)
687

  
688
        for tf in t_formats:
689
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=tf)
690
            self.assertEqual(r.status_code, 200)
691
            self.assertEqual(r.content, odata)
692

  
693
    def test_public_if_unmodified_since_precondition_failed(self):
694
        cname = self.create_container()[0]
695
        oname, odata = self.upload_object(cname)[:-1]
696

  
697
        # set public
698
        url = join_urls(self.pithos_path, self.user, cname, oname)
699
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
700
        self.assertEqual(r.status_code, 202)
701

  
702
        info = self.get_object_info(cname, oname)
703
        public_url = info['X-Object-Public']
704
        last_modified = info['Last-Modified']
705
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
706
        t = t - datetime.timedelta(seconds=1)
707
        t_formats = map(t.strftime, DATE_FORMATS)
708

  
709
        for tf in t_formats:
710
            r = self.get(public_url, HTTP_IF_UNMODIFIED_SINCE=tf)
711
            self.assertEqual(r.status_code, 412)

Also available in: Unified diff