Revision 1f06028c

b/snf-tools/synnefo_tools/burnin/pithos_tests.py
65 65
    large_file = Proper(value=None)
66 66

  
67 67
    def test_005_account_head(self):
68
        """HEAD on pithos account"""
68
        """Test account HEAD"""
69 69
        self._set_pithos_account(self._get_uuid())
70 70
        pithos = self.clients.pithos
71 71
        r = pithos.account_head()
......
89 89
        self.info('If_(un)modified_since is OK')
90 90

  
91 91
    def test_010_account_get(self):
92
        """Test account_get"""
92
        """Test account GET"""
93 93
        self.info('Preparation')
94 94
        pithos = self.clients.pithos
95 95
        for i in range(1, 3):
......
101 101
        pithos.set_object_sharing(obj, read_permission='*')
102 102
        self.info('Created object /%s/%s' % (cont_name, obj))
103 103

  
104
        #  Try to re-create the same container
105
        pithos.create_container(cont_name)
106

  
104 107
        r = pithos.list_containers()
105 108
        fullLen = len(r)
106 109
        self.assertTrue(fullLen > 2)
107 110
        self.info('Normal use is OK')
108 111

  
112
        cnames = [c['name'] for c in r]
113
        self.assertEqual(sorted(list(set(cnames))), sorted(cnames))
114
        self.info('Containers have unique names')
115

  
109 116
        r = pithos.account_get(limit=1)
110 117
        self.assertEqual(len(r.json), 1)
111 118
        self.info('Limit works')
......
139 146
        self.info('If_(un)modified_since is OK')
140 147

  
141 148
    def test_015_account_post(self):
142
        """Test account_post"""
149
        """Test account POST"""
143 150
        pithos = self.clients.pithos
144 151
        r = pithos.account_post()
145 152
        self.assertEqual(r.status_code, 202)
......
561 568
            self.assertEqual(self.large_file.read(64), dnl_f.read(64))
562 569
        self.info('Sampling shows that files match')
563 570

  
564
        """Upload a boring file"""
565 571
        self.info('Create a boring file of 42 blocks...')
566 572
        bor_f = self._create_boring_file(42)
567 573
        trg_fname = 'dir/uploaded.file'
......
576 582
        for i in range(42):
577 583
            self.assertEqual(sample_block(bor_f, i), sample_block(dnl_f, i))
578 584

  
579
    def test_150_stop_test(self):
580
        """STOP TESTING ALREADY"""
581
        self.assertTrue(False)
585
    def test_055_object_put(self):
586
        """Test object PUT"""
587
        pithos = self.clients.pithos
588
        obj = 'sample.file'
589

  
590
        pithos.create_object(obj + '.FAKE')
591
        r = pithos.get_object_info(obj + '.FAKE')
592
        self.assertEqual(
593
            set(r['content-type']), set('application/octer-stream'))
594
        self.info('Simple call creates a new object correctly')
595

  
596
        r = pithos.object_put(
597
            obj,
598
            data='a',
599
            content_type='application/octer-stream',
600
            permissions=dict(
601
                read=['accX:groupA', 'u1', 'u2'],
602
                write=['u2', 'u3']),
603
            metadata=dict(key1='val1', key2='val2'),
604
            content_encoding='UTF-8',
605
            content_disposition='attachment; filename="fname.ext"')
606
        self.assertEqual(r.status_code, 201)
607
        self.info('Status code is OK')
608
        etag = r.headers['etag']
609

  
610
        r = pithos.get_object_info(obj)
611
        self.assertTrue('content-disposition' in r)
612
        self.assertEqual(
613
            r['content-disposition'], 'attachment; filename="fname.ext"')
614
        self.info('Content-disposition is OK')
615

  
616
        sharing = r['x-object-sharing'].split('; ')
617
        self.assertTrue(sharing[0].startswith('read='))
618
        read = set(sharing[0][5:].split(','))
619
        self.assertEqual(set(('u1', 'accx:groupa')), read)
620
        self.assertTrue(sharing[1].startswith('write='))
621
        write = set(sharing[1][6:].split(','))
622
        self.assertEqual(set(('u2', 'u3')), write)
623
        self.info('Permissions are OK')
624

  
625
        r = pithos.get_object_meta(obj)
626
        self.assertEqual(r['x-object-meta-key1'], 'val1')
627
        self.assertEqual(r['x-object-meta-key2'], 'val2')
628
        self.info('Meta are OK')
629

  
630
        pithos.object_put(
631
            obj,
632
            if_etag_match=etag,
633
            data='b',
634
            content_type='application/octet-stream',
635
            public=True)
636
        self.info('If-etag-match is OK')
582 637

  
583
    def test_152_unique_containers(self):
584
        """Test if containers have unique names"""
585
        names = [n['name'] for n in self.containers]
586
        names = sorted(names)
587
        self.assertEqual(sorted(list(set(names))), names)
638
        r = pithos.object_get(obj)
639
        self.assertTrue('x-object-public' in r.headers)
640
        self.info('Publishing works')
641

  
642
        vers2 = int(r.headers['x-object-version'])
643
        etag = r.headers['etag']
644
        self.assertEqual(r.text, 'b')
645
        self.info('Remote object content is correct')
646

  
647
        r = pithos.object_put(
648
            obj,
649
            if_etag_not_match=etag,
650
            data='c',
651
            content_type='application/octet-stream',
652
            success=(201, 412))
653
        self.assertEqual(r.status_code, 412)
654
        self.info('If-etag-not-match is OK')
655

  
656
        r = pithos.get_object_info('dir')
657
        self.assertEqual(r['content-type'], 'application/directory')
658
        self.info('Directory has been created correctly')
659

  
660
        r = pithos.object_put(
661
            '%s_v2' % obj,
662
            format=None,
663
            copy_from='/%s/%s' % (pithos.container, obj),
664
            content_encoding='application/octet-stream',
665
            source_account=pithos.account,
666
            content_length=0,
667
            success=201)
668
        self.assertEqual(r.status_code, 201)
669
        r1 = pithos.get_object_info(obj)
670
        r2 = pithos.get_object_info('%s_v2' % obj)
671
        self.assertEqual(r1['x-object-hash'], r2['x-object-hash'])
672
        self.info('Object has being copied in same container, OK')
673

  
674
        pithos.copy_object(
675
            src_container=pithos.container,
676
            src_object=obj,
677
            dst_container=self.temp_containers[-2],
678
            dst_object='%s_new' % obj)
679
        pithos.container = self.temp_containers[-2]
680
        r1 = pithos.get_object_info('%s_new' % obj)
681
        pithos.container = self.temp_containers[-1]
682
        r2 = pithos.get_object_info(obj)
683
        self.assertEqual(r1['x-object-hash'], r2['x-object-hash'])
684
        self.info('Object has being copied in another container, OK')
685

  
686
        fromstr = '/%s/%s_new' % (self.temp_containers[-2], obj)
687
        r = pithos.object_put(
688
            obj,
689
            format=None,
690
            copy_from=fromstr,
691
            content_encoding='application/octet-stream',
692
            source_account=pithos.account,
693
            content_length=0,
694
            success=201)
695
        self.assertEqual(r.status_code, 201)
696
        self.info('Cross container put accepts content_encoding')
697

  
698
        r = pithos.get_object_info(obj)
699
        self.assertEqual(r['etag'], etag)
700
        self.info('Etag is OK')
701

  
702
        r = pithos.object_put(
703
            '%s_v3' % obj,
704
            format=None,
705
            move_from=fromstr,
706
            content_encoding='application/octet-stream',
707
            source_account='nonExistendAddress@NeverLand.com',
708
            content_length=0,
709
            success=(201, 403))
710
        self.assertEqual(r.status_code, 403)
711
        self.info('Fake source account is handled correctly')
712

  
713
        r1 = pithos.get_object_info(obj)
714
        pithos.container = self.temp_containers[-2]
715
        pithos.move_object(
716
            src_container=self.temp_containers[-1],
717
            src_object=obj,
718
            dst_container=pithos.container,
719
            dst_object=obj + '_new')
720
        r0 = pithos.get_object_info(obj + '_new')
721
        self.assertEqual(r1['x-object-hash'], r0['x-object-hash'])
722
        self.info('Cross container move is OK')
723

  
724
        pithos.object_put(
725
            obj,
726
            format=None,
727
            move_from='/%s/dir/%s' % (self.temp_containers[-1], obj),
728
            source_version=vers2,
729
            content_encoding='application/octet-stream',
730
            content_length=0, success=201)
731
        self.info('Source-version is OK')
732

  
733
        mobj = 'manifest.test'
734
        txt = ''
735
        for i in range(10):
736
            txt += '%s' % i
737
            pithos.object_put(
738
                '%s/%s' % (mobj, i),
739
                data='%s' % i,
740
                content_length=1,
741
                success=201,
742
                content_type='application/octet-stream',
743
                content_encoding='application/octet-stream')
744
        pithos.object_put(
745
            mobj,
746
            content_length=0,
747
            content_type='application/octet-stream',
748
            manifest='%s/%s' % (pithos.container, mobj))
749
        r = pithos.object_get(mobj)
750
        self.assertEqual(r.text, txt)
751
        self.info('Manifest file creation works')
752

  
753
        f = self._create_large_file(1024 * 10)
754
        pithos.upload_object('sample.file', f)
755
        r = pithos.get_object_info('sample.file')
756
        self.assertEqual(int(r['content-length']), 10240)
757
        self.info('Overwrite is OK')
758

  
759
        """MISSING: test transfer-encoding?"""
588 760

  
589 761
    @classmethod
590 762
    def tearDownClass(cls):  # noqa

Also available in: Unified diff