Revision b0d884e9
b/snf-tools/synnefo_tools/burnin/pithos_tests.py | ||
---|---|---|
639 | 639 |
self.assertTrue('x-object-public' in r.headers) |
640 | 640 |
self.info('Publishing works') |
641 | 641 |
|
642 |
vers2 = int(r.headers['x-object-version']) |
|
643 | 642 |
etag = r.headers['etag'] |
644 | 643 |
self.assertEqual(r.text, 'b') |
645 | 644 |
self.info('Remote object content is correct') |
... | ... | |
706 | 705 |
content_encoding='application/octet-stream', |
707 | 706 |
source_account='nonExistendAddress@NeverLand.com', |
708 | 707 |
content_length=0, |
709 |
success=(201, 403)) |
|
710 |
self.assertEqual(r.status_code, 403) |
|
708 |
success=(403, )) |
|
711 | 709 |
self.info('Fake source account is handled correctly') |
712 | 710 |
|
713 | 711 |
r1 = pithos.get_object_info(obj) |
... | ... | |
721 | 719 |
self.assertEqual(r1['x-object-hash'], r0['x-object-hash']) |
722 | 720 |
self.info('Cross container move is OK') |
723 | 721 |
|
722 |
pithos.container = self.temp_containers[-1] |
|
723 |
pithos.create_container(versioning='auto') |
|
724 |
pithos.upload_from_string(obj, 'first version') |
|
725 |
source_hashmap = pithos.get_object_hashmap(obj)['hashes'] |
|
726 |
pithos.upload_from_string(obj, 'second version') |
|
727 |
pithos.upload_from_string(obj, 'third version') |
|
728 |
versions = pithos.get_object_versionlist(obj) |
|
729 |
self.assertEqual(len(versions), 3) |
|
730 |
vers0 = versions[0][0] |
|
731 |
|
|
732 |
pithos.container = self.temp_containers[-2] |
|
724 | 733 |
pithos.object_put( |
725 | 734 |
obj, |
726 | 735 |
format=None, |
727 |
move_from='/%s/dir/%s' % (self.temp_containers[-1], obj),
|
|
728 |
source_version=vers2,
|
|
736 |
move_from='/%s/%s' % (self.temp_containers[-1], obj), |
|
737 |
source_version=vers0,
|
|
729 | 738 |
content_encoding='application/octet-stream', |
730 | 739 |
content_length=0, success=201) |
740 |
target_hashmap = pithos.get_object_hashmap(obj)['hashes'] |
|
741 |
self.assertEqual(source_hashmap, target_hashmap) |
|
731 | 742 |
self.info('Source-version is OK') |
732 | 743 |
|
733 | 744 |
mobj = 'manifest.test' |
... | ... | |
758 | 769 |
|
759 | 770 |
"""MISSING: test transfer-encoding?""" |
760 | 771 |
|
772 |
def test_060_object_copy(self): |
|
773 |
pithos = self.clients.pithos |
|
774 |
obj, trg = 'source.file2copy', 'copied.file' |
|
775 |
data = '{"key1":"val1", "key2":"val2"}' |
|
776 |
|
|
777 |
r = pithos.object_put( |
|
778 |
obj, |
|
779 |
content_type='application/octet-stream', |
|
780 |
data=data, |
|
781 |
metadata=dict(mkey1='mval1', mkey2='mval2'), |
|
782 |
permissions=dict( |
|
783 |
read=['accX:groupA', 'u1', 'u2'], |
|
784 |
write=['u2', 'u3']), |
|
785 |
content_disposition='attachment; filename="fname.ext"') |
|
786 |
self.info('Prepared a file /%s/%s' % (pithos.container, obj)) |
|
787 |
|
|
788 |
r = pithos.object_copy( |
|
789 |
obj, |
|
790 |
destination='/%s/%s' % (pithos.container, trg), |
|
791 |
ignore_content_type=False, content_type='application/json', |
|
792 |
metadata={'mkey2': 'mval2a', 'mkey3': 'mval3'}, |
|
793 |
permissions={'write': ['u5', 'accX:groupB']}) |
|
794 |
self.assertEqual(r.status_code, 201) |
|
795 |
self.info('Status code is OK') |
|
796 |
|
|
797 |
r = pithos.get_object_info(trg) |
|
798 |
self.assertTrue('content-disposition' in r) |
|
799 |
self.info('Content-disposition is OK') |
|
800 |
|
|
801 |
self.assertEqual(r['x-object-meta-mkey1'], 'mval1') |
|
802 |
self.assertEqual(r['x-object-meta-mkey2'], 'mval2a') |
|
803 |
self.assertEqual(r['x-object-meta-mkey3'], 'mval3') |
|
804 |
self.info('Metadata are OK') |
|
805 |
|
|
806 |
r = pithos.get_object_sharing(trg) |
|
807 |
self.assertFalse('read' in r or 'u2' in r['write']) |
|
808 |
self.assertTrue('accx:groupb' in r['write']) |
|
809 |
self.info('Sharing is OK') |
|
810 |
|
|
811 |
r = pithos.object_copy( |
|
812 |
obj, |
|
813 |
destination='/%s/%s' % (pithos.container, obj), |
|
814 |
content_encoding='utf8', |
|
815 |
content_type='application/json', |
|
816 |
destination_account='nonExistendAddress@NeverLand.com', |
|
817 |
success=(201, 404)) |
|
818 |
self.assertEqual(r.status_code, 404) |
|
819 |
self.info('Non existing UUID correctly causes a 404') |
|
820 |
|
|
821 |
"""Check destination being another container |
|
822 |
and also content_type and content encoding""" |
|
823 |
r = pithos.object_copy( |
|
824 |
obj, |
|
825 |
destination='/%s/%s' % (self.temp_containers[-1], obj), |
|
826 |
content_encoding='utf8', |
|
827 |
content_type='application/json') |
|
828 |
self.assertEqual(r.status_code, 201) |
|
829 |
self.assertEqual( |
|
830 |
r.headers['content-type'], |
|
831 |
'application/json; charset=UTF-8') |
|
832 |
|
|
833 |
"""Check ignore_content_type and content_type""" |
|
834 |
pithos.container = self.temp_containers[-1] |
|
835 |
r = pithos.object_get(obj) |
|
836 |
etag = r.headers['etag'] |
|
837 |
ctype = r.headers['content-type'] |
|
838 |
self.assertEqual(ctype, 'application/json') |
|
839 |
self.info('Cross container copy w. content-type/encoding is OK') |
|
840 |
|
|
841 |
r = pithos.object_copy( |
|
842 |
obj, |
|
843 |
destination='/%s/%s0' % (pithos.container, obj), |
|
844 |
ignore_content_type=True, |
|
845 |
content_type='text/x-python') |
|
846 |
self.assertEqual(r.status_code, 201) |
|
847 |
self.assertNotEqual(r.headers['content-type'], 'application/json') |
|
848 |
r = pithos.object_get(obj + '0') |
|
849 |
self.assertNotEqual(r.headers['content-type'], 'text/x-python') |
|
850 |
|
|
851 |
r = pithos.object_copy( |
|
852 |
obj, |
|
853 |
destination='/%s/%s1' % (pithos.container, obj), |
|
854 |
if_etag_match=etag) |
|
855 |
self.assertEqual(r.status_code, 201) |
|
856 |
self.info('if-etag-match is OK') |
|
857 |
|
|
858 |
r = pithos.object_copy( |
|
859 |
obj, |
|
860 |
destination='/%s/%s2' % (pithos.container, obj), |
|
861 |
if_etag_not_match='lalala') |
|
862 |
self.assertEqual(r.status_code, 201) |
|
863 |
self.info('if-etag-not-match is OK') |
|
864 |
|
|
865 |
r = pithos.object_copy( |
|
866 |
'%s2' % obj, |
|
867 |
destination='/%s/%s3' % (pithos.container, obj), |
|
868 |
format='xml', |
|
869 |
public=True) |
|
870 |
self.assertEqual(r.status_code, 201) |
|
871 |
self.assertTrue('xml' in r.headers['content-type']) |
|
872 |
|
|
873 |
r = pithos.get_object_info(obj + '3') |
|
874 |
self.assertTrue('x-object-public' in r) |
|
875 |
self.info('Publish, format and source-version are OK') |
|
876 |
|
|
761 | 877 |
@classmethod |
762 | 878 |
def tearDownClass(cls): # noqa |
763 | 879 |
"""Clean up""" |
Also available in: Unified diff