Revision f53483d8

b/snf-pithos-app/pithos/api/test/__init__.py
34 34
# interpreted as representing official policies, either expressed
35 35
# or implied, of GRNET S.A.
36 36

  
37
from urlparse import urlunsplit, urlsplit
37
from urlparse import urlunsplit, urlsplit, urlparse
38 38
from xml.dom import minidom
39 39
from urllib import quote, unquote
40 40

  
......
48 48
from synnefo.lib import join_urls
49 49

  
50 50
from django.test import TestCase
51
from django.test.client import Client, MULTIPART_CONTENT, FakePayload
51 52
from django.test.simple import DjangoTestSuiteRunner
52 53
from django.conf import settings
53 54
from django.utils.http import urlencode
......
102 103
    'django.db.backends.oracle': 'oracle'}
103 104

  
104 105

  
105
def prepate_db_connection():
106
def prepare_db_connection():
106 107
    """Build pithos backend connection string from django default database"""
107 108

  
108 109
    db = settings.DATABASES['default']
......
141 142
    def setup_databases(self, **kwargs):
142 143
        old_names, mirrors = super(PithosTestSuiteRunner,
143 144
                                   self).setup_databases(**kwargs)
144
        prepate_db_connection()
145
        prepare_db_connection()
145 146
        return old_names, mirrors
146 147

  
147 148
    def teardown_databases(self, old_config, **kwargs):
......
151 152
                                                              **kwargs)
152 153

  
153 154

  
155
class PithosTestClient(Client):
156
    def copy(self, path, data={}, content_type=MULTIPART_CONTENT,
157
             follow=False, **extra):
158
        """
159
        Send a resource to the server using COPY.
160
        """
161
        parsed = urlparse(path)
162
        r = {
163
            'CONTENT_TYPE':    'text/html; charset=utf-8',
164
            'PATH_INFO':       self._get_path(parsed),
165
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
166
            'REQUEST_METHOD': 'COPY',
167
            'wsgi.input':      FakePayload('')
168
        }
169
        r.update(extra)
170

  
171
        response = self.request(**r)
172
        if follow:
173
            response = self._handle_redirects(response, **extra)
174
        return response
175

  
176
    def move(self, path, data={}, content_type=MULTIPART_CONTENT,
177
             follow=False, **extra):
178
        """
179
        Send a resource to the server using MOVE.
180
        """
181
        parsed = urlparse(path)
182
        r = {
183
            'CONTENT_TYPE':    'text/html; charset=utf-8',
184
            'PATH_INFO':       self._get_path(parsed),
185
            'QUERY_STRING':    urlencode(data, doseq=True) or parsed[4],
186
            'REQUEST_METHOD': 'MOVE',
187
            'wsgi.input':      FakePayload('')
188
        }
189
        r.update(extra)
190

  
191
        response = self.request(**r)
192
        if follow:
193
            response = self._handle_redirects(response, **extra)
194
        return response
195

  
196

  
154 197
class PithosAPITest(TestCase):
155 198
    def setUp(self):
199
        self.client = PithosTestClient()
200

  
156 201
        # Override default block size to spead up tests
157 202
        pithos_settings.BACKEND_BLOCK_SIZE = TEST_BLOCK_SIZE
158 203
        pithos_settings.BACKEND_HASH_ALGORITHM = TEST_HASH_ALGORITHM
......
177 222
            self.delete_container_content(c['name'])
178 223
            self.delete_container(c['name'])
179 224

  
180
    def head(self, url, user='user', data={}, follow=False, **extra):
225
    def head(self, url, user='user', token='DummyToken', data={}, follow=False,
226
             **extra):
181 227
        with astakos_user(user):
182 228
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
183
            extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
229
            if token:
230
                extra['HTTP_X_AUTH_TOKEN'] = token
184 231
            response = self.client.head(url, data, follow, **extra)
185 232
        return response
186 233

  
187
    def get(self, url, user='user', data={}, follow=False, **extra):
234
    def get(self, url, user='user', token='DummyToken', data={}, follow=False,
235
            **extra):
188 236
        with astakos_user(user):
189 237
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
190
            extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
238
            if token:
239
                extra['HTTP_X_AUTH_TOKEN'] = token
191 240
            response = self.client.get(url, data, follow, **extra)
192 241
        return response
193 242

  
194
    def delete(self, url, user='user', data={}, follow=False, **extra):
243
    def delete(self, url, user='user', token='DummyToken', data={},
244
               follow=False, **extra):
195 245
        with astakos_user(user):
196 246
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
197
            extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
247
            if token:
248
                extra['HTTP_X_AUTH_TOKEN'] = token
198 249
            response = self.client.delete(url, data, follow, **extra)
199 250
        return response
200 251

  
201
    def post(self, url, user='user', data={},
252
    def post(self, url, user='user', token='DummyToken', data={},
202 253
             content_type='application/octet-stream', follow=False, **extra):
203 254
        with astakos_user(user):
204 255
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
205
            extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
256
            if token:
257
                extra['HTTP_X_AUTH_TOKEN'] = token
206 258
            response = self.client.post(url, data, content_type, follow,
207 259
                                        **extra)
208 260
        return response
209 261

  
210
    def put(self, url, user='user', data={},
262
    def put(self, url, user='user', token='DummyToken', data={},
211 263
            content_type='application/octet-stream', follow=False, **extra):
212 264
        with astakos_user(user):
213 265
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
214
            extra.setdefault('HTTP_X_AUTH_TOKEN', 'token')
266
            if token:
267
                extra['HTTP_X_AUTH_TOKEN'] = token
215 268
            response = self.client.put(url, data, content_type, follow,
216 269
                                       **extra)
217 270
        return response
218 271

  
272
    def copy(self, url, user='user', token='DummyToken', data={},
273
             content_type='application/octet-stream', follow=False, **extra):
274
        with astakos_user(user):
275
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
276
            if token:
277
                extra['HTTP_X_AUTH_TOKEN'] = token
278
            response = self.client.copy(url, data, content_type, follow,
279
                                        **extra)
280
        return response
281

  
282
    def move(self, url, user='user', token='DummyToken', data={},
283
             content_type='application/octet-stream', follow=False, **extra):
284
        with astakos_user(user):
285
            extra = dict((quote(k), quote(v)) for k, v in extra.items())
286
            if token:
287
                extra['HTTP_X_AUTH_TOKEN'] = token
288
            response = self.client.move(url, data, content_type, follow,
289
                                        **extra)
290
        return response
291

  
219 292
    def update_account_meta(self, meta, user=None):
220 293
        user = user or self.user
221 294
        kwargs = dict(
......
223 296
        url = join_urls(self.pithos_path, user)
224 297
        r = self.post('%s?update=' % url, user=user, **kwargs)
225 298
        self.assertEqual(r.status_code, 202)
226
        account_meta = self.get_account_meta()
227
        (self.assertTrue(k in account_meta) for k in meta.keys())
228
        (self.assertEqual(account_meta[k], v) for k, v in meta.items())
299
        account_meta = self.get_account_meta(user=user)
300
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
301
            k in meta.keys())
302
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
303
            k, v in meta.items())
304

  
305
    def reset_account_meta(self, meta, user=None):
306
        user = user or self.user
307
        kwargs = dict(
308
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
309
        url = join_urls(self.pithos_path, user)
310
        r = self.post(url, user=user, **kwargs)
311
        self.assertEqual(r.status_code, 202)
312
        account_meta = self.get_account_meta(user=user)
313
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
314
            k in meta.keys())
315
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
316
            k, v in meta.items())
229 317

  
230 318
    def delete_account_meta(self, meta, user=None):
231 319
        user = user or self.user
232
        transform = (lambda k: 'HTTP_X_ACCOUNT_META_%s' %
233
                     k.replace('-', '_').upper())
320
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
234 321
        kwargs = dict((transform(k), '') for k, v in meta.items())
235 322
        url = join_urls(self.pithos_path, user)
236 323
        r = self.post('%s?update=' % url, user=user, **kwargs)
237 324
        self.assertEqual(r.status_code, 202)
238
        account_meta = self.get_account_meta()
239
        (self.assertTrue(k not in account_meta) for k in meta.keys())
325
        account_meta = self.get_account_meta(user=user)
326
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
327
            k in meta.keys())
240 328
        return r
241 329

  
242 330
    def delete_account_groups(self, groups, user=None):
243 331
        user = user or self.user
244 332
        url = join_urls(self.pithos_path, user)
245
        transform = (lambda k: 'HTTP_X_ACCOUNT_GROUP_%s' %
246
                     k.replace('-', '_').upper())
247
        kwargs = dict((transform(k), '') for k, v in groups.items())
248
        r = self.post('%s?update=' % url, user=user, **kwargs)
333
        r = self.post('%s?update=' % url, user=user, **groups)
249 334
        self.assertEqual(r.status_code, 202)
250 335
        account_groups = self.get_account_groups()
251 336
        (self.assertTrue(k not in account_groups) for k in groups.keys())
......
438 523
    def get_object_meta(self, container, object, version=None, until=None,
439 524
                        user=None):
440 525
        prefix = 'X-Object-Meta-'
526
        user = user or self.user
441 527
        r = self.get_object_info(container, object, version, until=until,
442 528
                                 user=user)
443 529
        headers = dict(r._headers.values())
b/snf-pithos-app/pithos/api/test/listing.py
37 37

  
38 38
import django.utils.simplejson as json
39 39

  
40

  
40 41
class ListSharing(PithosAPITest):
41 42
    def _build_structure(self, user=None):
42 43
        user = user or self.user
b/snf-pithos-app/pithos/api/test/objects.py
35 35
# or implied, of GRNET S.A.
36 36

  
37 37
from collections import defaultdict
38
from urllib import quote
38
from urllib import quote, unquote
39 39
from functools import partial
40 40

  
41 41
from pithos.api.test import (PithosAPITest, pithos_settings,
......
60 60

  
61 61

  
62 62
class ObjectHead(PithosAPITest):
63
    def setUp(self):
63
    def test_get_object_meta(self):
64 64
        cname = self.create_container()[0]
65 65
        oname, odata = self.upload_object(cname)[:-1]
66 66

  
67
        url = join_urls(self.pithos_path, cname, oname)
67
        url = join_urls(self.pithos_path, self.user, cname, oname)
68
        r = self.head(url)
69

  
70
        mandatory = ['Etag',
71
                     'Content-Length',
72
                     'Content-Type',
73
                     'Last-Modified',
74
                     'X-Object-Hash',
75
                     'X-Object-UUID',
76
                     'X-Object-Version',
77
                     'X-Object-Version-Timestamp',
78
                     'X-Object-Modified-By']
79
        for i in mandatory:
80
            self.assertTrue(i in r)
81

  
82
        r = self.post(url, content_type='',
83
                      HTTP_CONTENT_ENCODING='gzip',
84
                      HTTP_CONTENT_DISPOSITION=(
85
                          'attachment; filename="%s"' % oname))
86
        self.assertEqual(r.status_code, 202)
87

  
68 88
        r = self.head(url)
69
        map(lambda i: self.assertTrue(i in r),
70
            ['Etag',
71
             'Content-Length',
72
             'Content-Type',
73
             'Last-Modified',
74
             'Content-Encoding',
75
             'Content-Disposition',
76
             'X-Object-Hash',
77
             'X-Object-UUID',
78
             'X-Object-Version',
79
             'X-Object-Version-Timestamp',
80
             'X-Object-Modified-By',
81
             'X-Object-Manifest'])
89
        for i in mandatory:
90
            self.assertTrue(i in r)
91
        self.assertTrue('Content-Encoding' in r)
92
        self.assertEqual(r['Content-Encoding'], 'gzip')
93
        self.assertTrue('Content-Disposition' in r)
94
        self.assertEqual(unquote(r['Content-Disposition']),
95
                         'attachment; filename="%s"' % oname)
96

  
97
        prefix = 'myobject/'
98
        data = ''
99
        for i in range(random.randint(2, 10)):
100
            part = '%s%d' % (prefix, i)
101
            data += self.upload_object(cname, oname=part)[1]
102

  
103
        manifest = '%s/%s' % (cname, prefix)
104
        oname = get_random_name()
105
        url = join_urls(self.pithos_path, self.user, cname, oname)
106
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest)
107
        self.assertEqual(r.status_code, 201)
108

  
109
        r = self.head(url)
110
        for i in mandatory:
111
            self.assertTrue(i in r)
112
        self.assertTrue('X-Object-Manifest' in r)
113
        self.assertEqual(r['X-Object-Manifest'], manifest)
82 114

  
83 115

  
84 116
class ObjectGet(PithosAPITest):
......
702 734
        self.assertEqual(r.content, data)
703 735

  
704 736

  
705
class ObjectCopy(PithosAPITest):
737
class ObjectPutCopy(PithosAPITest):
706 738
    def setUp(self):
707 739
        PithosAPITest.setUp(self)
708 740
        self.container = 'c1'
......
761 793
            self.assertTrue('X-Object-Hash' in r)
762 794
            self.assertEqual(r['X-Object-Hash'], self.etag)
763 795

  
796
    def test_copy_from_other_account(self):
797
        cname = 'c2'
798
        self.create_container(cname, user='chuck')
799
        self.create_container(cname, user='alice')
800

  
801
        # share object for read with alice
802
        url = join_urls(self.pithos_path, self.user, self.container,
803
                        self.object)
804
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
805
                      HTTP_X_OBJECT_SHARING='read=alice')
806
        self.assertEqual(r.status_code, 202)
807

  
808
        # assert not allowed for chuck
809
        oname = get_random_name()
810
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
811
        r = self.put(url, data='', user='chuck',
812
                     HTTP_X_OBJECT_META_TEST='testcopy',
813
                     HTTP_X_COPY_FROM='/%s/%s' % (
814
                         self.container, self.object),
815
                     HTTP_X_SOURCE_ACCOUNT='user')
816

  
817
        self.assertEqual(r.status_code, 403)
818

  
819
        # assert copy success for alice
820
        url = join_urls(self.pithos_path, 'alice', cname, oname)
821
        r = self.put(url, data='', user='alice',
822
                     HTTP_X_OBJECT_META_TEST='testcopy',
823
                     HTTP_X_COPY_FROM='/%s/%s' % (
824
                         self.container, self.object),
825
                     HTTP_X_SOURCE_ACCOUNT='user')
826
        self.assertEqual(r.status_code, 201)
827

  
828
        # assert access the new object
829
        r = self.head(url, user='alice')
830
        self.assertEqual(r.status_code, 200)
831
        self.assertTrue('X-Object-Meta-Test' in r)
832
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
833

  
834
        # assert etag is the same
835
        self.assertTrue('X-Object-Hash' in r)
836
        self.assertEqual(r['X-Object-Hash'], self.etag)
837

  
838
        # share object for write
839
        url = join_urls(self.pithos_path, self.user, self.container,
840
                        self.object)
841
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
842
                      HTTP_X_OBJECT_SHARING='write=dan')
843
        self.assertEqual(r.status_code, 202)
844

  
845
        # assert not allowed copy for alice
846
        url = join_urls(self.pithos_path, 'alice', cname, oname)
847
        r = self.put(url, data='', user='alice',
848
                     HTTP_X_OBJECT_META_TEST='testcopy',
849
                     HTTP_X_COPY_FROM='/%s/%s' % (
850
                         self.container, self.object),
851
                     HTTP_X_SOURCE_ACCOUNT='user')
852
        self.assertEqual(r.status_code, 403)
853

  
854
        # assert allowed copy for dan
855
        self.create_container(cname, user='dan')
856
        url = join_urls(self.pithos_path, 'dan', cname, oname)
857
        r = self.put(url, data='', user='dan',
858
                     HTTP_X_OBJECT_META_TEST='testcopy',
859
                     HTTP_X_COPY_FROM='/%s/%s' % (
860
                         self.container, self.object),
861
                     HTTP_X_SOURCE_ACCOUNT='user')
862
        self.assertEqual(r.status_code, 201)
863

  
864
        # assert access the new object
865
        r = self.head(url, user='dan')
866
        self.assertEqual(r.status_code, 200)
867
        self.assertTrue('X-Object-Meta-Test' in r)
868
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
869

  
870
        # assert etag is the same
871
        self.assertTrue('X-Object-Hash' in r)
872
        self.assertEqual(r['X-Object-Hash'], self.etag)
873

  
874
        # assert source object still exists
875
        url = join_urls(self.pithos_path, self.user, self.container,
876
                        self.object)
877
        r = self.head(url)
878
        self.assertEqual(r.status_code, 200)
879
        # assert etag is the same
880
        self.assertTrue('X-Object-Hash' in r)
881
        self.assertEqual(r['X-Object-Hash'], self.etag)
882

  
883
        r = self.get(url)
884
        self.assertEqual(r.status_code, 200)
885
        # assert etag is the same
886
        self.assertTrue('X-Object-Hash' in r)
887
        self.assertEqual(r['X-Object-Hash'], self.etag)
888

  
764 889
    def test_copy_invalid(self):
765 890
        # copy from non-existent object
766 891
        oname = get_random_name()
......
822 947
        self.assertEqual(r.status_code, 404)
823 948

  
824 949

  
825
class ObjectMove(PithosAPITest):
950
class ObjectPutMove(PithosAPITest):
826 951
    def setUp(self):
827 952
        PithosAPITest.setUp(self)
828 953
        self.container = 'c1'
......
901 1026
        r = self.head(url)
902 1027
        self.assertEqual(r.status_code, 404)
903 1028

  
1029
    def test_move_from_other_account(self):
1030
        cname = 'c2'
1031
        self.create_container(cname, user='chuck')
1032
        self.create_container(cname, user='alice')
1033

  
1034
        # share object for read with alice
1035
        url = join_urls(self.pithos_path, self.user, self.container,
1036
                        self.object)
1037
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1038
                      HTTP_X_OBJECT_SHARING='read=alice')
1039
        self.assertEqual(r.status_code, 202)
1040

  
1041
        # assert not allowed move for chuck
1042
        oname = get_random_name()
1043
        url = join_urls(self.pithos_path, 'chuck', cname, oname)
1044
        r = self.put(url, data='', user='chuck',
1045
                     HTTP_X_OBJECT_META_TEST='testcopy',
1046
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1047
                         self.container, self.object),
1048
                     HTTP_X_SOURCE_ACCOUNT='user')
1049

  
1050
        self.assertEqual(r.status_code, 403)
1051

  
1052
        # assert no new object was created
1053
        r = self.head(url, user='chuck')
1054
        self.assertEqual(r.status_code, 404)
1055

  
1056
        # assert not allowed move for alice
1057
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1058
        r = self.put(url, data='', user='alice',
1059
                     HTTP_X_OBJECT_META_TEST='testcopy',
1060
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1061
                         self.container, self.object),
1062
                     HTTP_X_SOURCE_ACCOUNT='user')
1063
        self.assertEqual(r.status_code, 403)
1064

  
1065
        # assert no new object was created
1066
        r = self.head(url, user='alice')
1067
        self.assertEqual(r.status_code, 404)
1068

  
1069
        # share object for write with dan
1070
        url = join_urls(self.pithos_path, self.user, self.container,
1071
                        self.object)
1072
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
1073
                      HTTP_X_OBJECT_SHARING='write=dan')
1074
        self.assertEqual(r.status_code, 202)
1075

  
1076
        # assert not allowed move for alice
1077
        url = join_urls(self.pithos_path, 'alice', cname, oname)
1078
        r = self.put(url, data='', user='alice',
1079
                     HTTP_X_OBJECT_META_TEST='testcopy',
1080
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1081
                         self.container, self.object),
1082
                     HTTP_X_SOURCE_ACCOUNT='user')
1083
        self.assertEqual(r.status_code, 403)
1084

  
1085
        # assert no new object was created
1086
        r = self.head(url, user='alice')
1087
        self.assertEqual(r.status_code, 404)
1088

  
1089
        # assert not allowed move for dan
1090
        self.create_container(cname, user='dan')
1091
        url = join_urls(self.pithos_path, 'dan', cname, oname)
1092
        r = self.put(url, data='', user='dan',
1093
                     HTTP_X_OBJECT_META_TEST='testcopy',
1094
                     HTTP_X_MOVE_FROM='/%s/%s' % (
1095
                         self.container, self.object),
1096
                     HTTP_X_SOURCE_ACCOUNT='user')
1097
        self.assertEqual(r.status_code, 403)
1098

  
1099
        # assert no new object was created
1100
        r = self.head(url, user='dan')
1101
        self.assertEqual(r.status_code, 404)
1102

  
1103

  
1104
class ObjectCopy(PithosAPITest):
1105
    def setUp(self):
1106
        PithosAPITest.setUp(self)
1107
        self.container = 'c1'
1108
        self.create_container(self.container)
1109
        self.object, self.data = self.upload_object(self.container)[:-1]
1110

  
1111
        url = join_urls(
1112
            self.pithos_path, self.user, self.container, self.object)
1113
        r = self.head(url)
1114
        self.etag = r['X-Object-Hash']
1115

  
1116
    def test_copy(self):
1117
        with AssertMappingInvariant(self.get_object_info, self.container,
1118
                                    self.object):
1119
            oname = get_random_name()
1120
            # copy object
1121
            url = join_urls(self.pithos_path, self.user, self.container,
1122
                            self.object)
1123
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1124
                          HTTP_DESTINATION='/%s/%s' % (self.container,
1125
                                                       oname))
1126
            # assert copy success
1127
            url = join_urls(self.pithos_path, self.user, self.container,
1128
                            oname)
1129
            self.assertEqual(r.status_code, 201)
1130

  
1131
            # assert access the new object
1132
            r = self.head(url)
1133
            self.assertEqual(r.status_code, 200)
1134
            self.assertTrue('X-Object-Meta-Test' in r)
1135
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1136

  
1137
            # assert etag is the same
1138
            self.assertTrue('X-Object-Hash' in r)
1139
            self.assertEqual(r['X-Object-Hash'], self.etag)
1140

  
1141
            # assert source object still exists
1142
            url = join_urls(self.pithos_path, self.user, self.container,
1143
                            self.object)
1144
            r = self.head(url)
1145
            self.assertEqual(r.status_code, 200)
1146

  
1147
            # assert etag is the same
1148
            self.assertTrue('X-Object-Hash' in r)
1149
            self.assertEqual(r['X-Object-Hash'], self.etag)
1150

  
1151
            r = self.get(url)
1152
            self.assertEqual(r.status_code, 200)
1153

  
1154
            # assert etag is the same
1155
            self.assertTrue('X-Object-Hash' in r)
1156
            self.assertEqual(r['X-Object-Hash'], self.etag)
1157

  
1158
            # copy object to other container (not existing)
1159
            cname = get_random_name()
1160
            url = join_urls(self.pithos_path, self.user, self.container,
1161
                            self.object)
1162
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1163
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1164

  
1165
            # assert destination container does not exist
1166
            url = join_urls(self.pithos_path, self.user, cname,
1167
                            self.object)
1168
            self.assertEqual(r.status_code, 404)
1169

  
1170
            # create container
1171
            self.create_container(cname)
1172

  
1173
            # copy object to other container (existing)
1174
            url = join_urls(self.pithos_path, self.user, self.container,
1175
                            self.object)
1176
            r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1177
                          HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1178

  
1179
            # assert copy success
1180
            url = join_urls(self.pithos_path, self.user, cname,
1181
                            self.object)
1182
            self.assertEqual(r.status_code, 201)
1183

  
1184
            # assert access the new object
1185
            r = self.head(url)
1186
            self.assertEqual(r.status_code, 200)
1187
            self.assertTrue('X-Object-Meta-Test' in r)
1188
            self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1189

  
1190
            # assert etag is the same
1191
            self.assertTrue('X-Object-Hash' in r)
1192
            self.assertEqual(r['X-Object-Hash'], self.etag)
1193

  
1194
                        # assert source object still exists
1195
            url = join_urls(self.pithos_path, self.user, self.container,
1196
                            self.object)
1197
            r = self.head(url)
1198
            self.assertEqual(r.status_code, 200)
1199

  
1200
            # assert etag is the same
1201
            self.assertTrue('X-Object-Hash' in r)
1202
            self.assertEqual(r['X-Object-Hash'], self.etag)
1203

  
1204
            r = self.get(url)
1205
            self.assertEqual(r.status_code, 200)
1206

  
1207
            # assert etag is the same
1208
            self.assertTrue('X-Object-Hash' in r)
1209
            self.assertEqual(r['X-Object-Hash'], self.etag)
1210

  
1211
    def test_copy_to_other_account(self):
1212
        # create a container under alice account
1213
        cname = self.create_container(user='alice')[0]
1214

  
1215
        # create a folder under this container
1216
        folder = self.create_folder(cname, user='alice')[0]
1217

  
1218
        oname = get_random_name()
1219

  
1220
        # copy object to other account container
1221
        url = join_urls(self.pithos_path, self.user, self.container,
1222
                        self.object)
1223
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1224
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1225
                      HTTP_DESTINATION_ACCOUNT='alice')
1226
        self.assertEqual(r.status_code, 403)
1227

  
1228
        # share object for read with user
1229
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1230
        r = self.post(url, user='alice', content_type='',
1231
                      HTTP_CONTENT_RANGE='bytes */*',
1232
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1233
        self.assertEqual(r.status_code, 202)
1234

  
1235
        # assert copy object still is not allowed
1236
        url = join_urls(self.pithos_path, self.user, self.container,
1237
                        self.object)
1238
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1239
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1240
                      HTTP_DESTINATION_ACCOUNT='alice')
1241
        self.assertEqual(r.status_code, 403)
1242

  
1243
        # share object for write with user
1244
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1245
        r = self.post(url, user='alice',  content_type='',
1246
                      HTTP_CONTENT_RANGE='bytes */*',
1247
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1248
        self.assertEqual(r.status_code, 202)
1249

  
1250
        # assert copy object now is allowed
1251
        url = join_urls(self.pithos_path, self.user, self.container,
1252
                        self.object)
1253
        r = self.copy(url, HTTP_X_OBJECT_META_TEST='testcopy',
1254
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1255
                      HTTP_DESTINATION_ACCOUNT='alice')
1256
        self.assertEqual(r.status_code, 201)
1257

  
1258
        # assert access the new object
1259
        url = join_urls(self.pithos_path, 'alice', cname, folder, oname)
1260
        r = self.head(url, user='alice')
1261
        self.assertEqual(r.status_code, 200)
1262
        self.assertTrue('X-Object-Meta-Test' in r)
1263
        self.assertEqual(r['X-Object-Meta-Test'], 'testcopy')
1264

  
1265
        # assert etag is the same
1266
        self.assertTrue('X-Object-Hash' in r)
1267
        self.assertEqual(r['X-Object-Hash'], self.etag)
1268

  
1269
        # assert source object still exists
1270
        url = join_urls(self.pithos_path, self.user, self.container,
1271
                        self.object)
1272
        r = self.head(url)
1273
        self.assertEqual(r.status_code, 200)
1274

  
1275
        # assert etag is the same
1276
        self.assertTrue('X-Object-Hash' in r)
1277
        self.assertEqual(r['X-Object-Hash'], self.etag)
1278

  
1279
        r = self.get(url)
1280
        self.assertEqual(r.status_code, 200)
1281

  
1282
        # assert etag is the same
1283
        self.assertTrue('X-Object-Hash' in r)
1284
        self.assertEqual(r['X-Object-Hash'], self.etag)
1285

  
1286

  
1287
class ObjectMove(PithosAPITest):
1288
    def setUp(self):
1289
        PithosAPITest.setUp(self)
1290
        self.container = 'c1'
1291
        self.create_container(self.container)
1292
        self.object, self.data = self.upload_object(self.container)[:-1]
1293

  
1294
        url = join_urls(
1295
            self.pithos_path, self.user, self.container, self.object)
1296
        r = self.head(url)
1297
        self.etag = r['X-Object-Hash']
1298

  
1299
    def test_move(self):
1300
        oname = get_random_name()
1301

  
1302
        # move object
1303
        url = join_urls(self.pithos_path, self.user, self.container,
1304
                        self.object)
1305
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1306
                      HTTP_DESTINATION='/%s/%s' % (self.container,
1307
                                                   oname))
1308
        # assert move success
1309
        url = join_urls(self.pithos_path, self.user, self.container,
1310
                        oname)
1311
        self.assertEqual(r.status_code, 201)
1312

  
1313
        # assert access the new object
1314
        r = self.head(url)
1315
        self.assertEqual(r.status_code, 200)
1316
        self.assertTrue('X-Object-Meta-Test' in r)
1317
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1318

  
1319
        # assert etag is the same
1320
        self.assertTrue('X-Object-Hash' in r)
1321
        self.assertEqual(r['X-Object-Hash'], self.etag)
1322

  
1323
        # assert source object does not exist
1324
        url = join_urls(self.pithos_path, self.user, self.container,
1325
                        self.object)
1326
        r = self.head(url)
1327
        self.assertEqual(r.status_code, 404)
1328

  
1329
    def test_move_to_other_container(self):
1330
        # move object to other container (not existing)
1331
        cname = get_random_name()
1332
        url = join_urls(self.pithos_path, self.user, self.container,
1333
                        self.object)
1334
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1335
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1336

  
1337
        # assert destination container does not exist
1338
        url = join_urls(self.pithos_path, self.user, cname,
1339
                        self.object)
1340
        self.assertEqual(r.status_code, 404)
1341

  
1342
        # create container
1343
        self.create_container(cname)
1344

  
1345
        # move object to other container (existing)
1346
        url = join_urls(self.pithos_path, self.user, self.container,
1347
                        self.object)
1348
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1349
                      HTTP_DESTINATION='/%s/%s' % (cname, self.object))
1350

  
1351
        # assert move success
1352
        url = join_urls(self.pithos_path, self.user, cname,
1353
                        self.object)
1354
        self.assertEqual(r.status_code, 201)
1355

  
1356
        # assert access the new object
1357
        r = self.head(url)
1358
        self.assertEqual(r.status_code, 200)
1359
        self.assertTrue('X-Object-Meta-Test' in r)
1360
        self.assertEqual(r['X-Object-Meta-Test'], 'testmove')
1361

  
1362
        # assert etag is the same
1363
        self.assertTrue('X-Object-Hash' in r)
1364
        self.assertEqual(r['X-Object-Hash'], self.etag)
1365

  
1366
        # assert source object does not exist
1367
        url = join_urls(self.pithos_path, self.user, self.container,
1368
                        self.object)
1369
        r = self.head(url)
1370
        self.assertEqual(r.status_code, 404)
1371

  
1372
    def test_move_to_other_account(self):
1373
        # create a container under alice account
1374
        cname = self.create_container(user='alice')[0]
1375

  
1376
        # create a folder under this container
1377
        folder = self.create_folder(cname, user='alice')[0]
1378

  
1379
        oname = get_random_name()
1380

  
1381
        # move object to other account container
1382
        url = join_urls(self.pithos_path, self.user, self.container,
1383
                        self.object)
1384
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1385
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1386
                      HTTP_DESTINATION_ACCOUNT='alice')
1387
        self.assertEqual(r.status_code, 403)
1388

  
1389
        # share object for read with user
1390
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1391
        r = self.post(url, user='alice', content_type='',
1392
                      HTTP_CONTENT_RANGE='bytes */*',
1393
                      HTTP_X_OBJECT_SHARING='read=%s' % self.user)
1394
        self.assertEqual(r.status_code, 202)
1395

  
1396
        # assert move object still is not allowed
1397
        url = join_urls(self.pithos_path, self.user, self.container,
1398
                        self.object)
1399
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1400
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1401
                      HTTP_DESTINATION_ACCOUNT='alice')
1402
        self.assertEqual(r.status_code, 403)
1403

  
1404
        # share object for write with user
1405
        url = join_urls(self.pithos_path, 'alice', cname, folder)
1406
        r = self.post(url, user='alice',  content_type='',
1407
                      HTTP_CONTENT_RANGE='bytes */*',
1408
                      HTTP_X_OBJECT_SHARING='write=%s' % self.user)
1409
        self.assertEqual(r.status_code, 202)
1410

  
1411
        # assert move object now is allowed
1412
        url = join_urls(self.pithos_path, self.user, self.container,
1413
                        self.object)
1414
        r = self.move(url, HTTP_X_OBJECT_META_TEST='testmove',
1415
                      HTTP_DESTINATION='/%s/%s/%s' % (cname, folder, oname),
1416
                      HTTP_DESTINATION_ACCOUNT='alice')
1417
        self.assertEqual(r.status_code, 201)
1418

  
1419
        # assert source object does not exist
1420
        url = join_urls(self.pithos_path, self.user, self.container,
1421
                        self.object)
1422
        r = self.head(url)
1423
        self.assertEqual(r.status_code, 404)
1424

  
904 1425

  
905 1426
class ObjectPost(PithosAPITest):
906 1427
    def setUp(self):
......
1277 1798
        self.assertEqual(data, v[2][2])
1278 1799
        append((r['X-Object-Version'], len(data), data))
1279 1800

  
1801
    def test_update_from_other_version(self):
1802
        versions = []
1803
        info = self.get_object_info(self.container, self.object)
1804
        versions.append(info['X-Object-Version'])
1805
        pre_length = int(info['Content-Length'])
1806

  
1807
        # update object
1808
        d1, r = self.upload_object(self.container, self.object,
1809
                                   length=pre_length - 1)[1:]
1810
        self.assertTrue('X-Object-Version' in r)
1811
        versions.append(r['X-Object-Version'])
1812

  
1813
        # update object
1814
        d2, r = self.upload_object(self.container, self.object,
1815
                                   length=pre_length - 2)[1:]
1816
        self.assertTrue('X-Object-Version' in r)
1817
        versions.append(r['X-Object-Version'])
1818

  
1819
        # get previous version
1820
        url = join_urls(self.pithos_path, self.user, self.container,
1821
                        self.object)
1822
        r = self.get('%s?version=list&format=json' % url)
1823
        self.assertEqual(r.status_code, 200)
1824
        l = json.loads(r.content)['versions']
1825
        self.assertEqual(len(l), 3)
1826
        self.assertEqual([str(v[0]) for v in l], versions)
1827

  
1828
        # update with the previous version
1829
        r = self.post(url,
1830
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1831
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1832
                                                       self.object),
1833
                      HTTP_X_SOURCE_VERSION=versions[0])
1834
        self.assertEqual(r.status_code, 204)
1835

  
1836
        # check content
1837
        r = self.get(url)
1838
        content = r.content
1839
        self.assertEqual(len(content), pre_length)
1840
        self.assertEqual(content, self.object_data)
1841

  
1842
        # update object
1843
        d3, r = self.upload_object(self.container, self.object,
1844
                                   length=len(d2) + 1)[1:]
1845
        self.assertTrue('X-Object-Version' in r)
1846
        versions.append(r['X-Object-Version'])
1847

  
1848
        # update with the previous version
1849
        r = self.post(url,
1850
                      HTTP_CONTENT_RANGE='bytes 0-/*',
1851
                      HTTP_X_SOURCE_OBJECT='/%s/%s' % (self.container,
1852
                                                       self.object),
1853
                      HTTP_X_SOURCE_VERSION=versions[-2])
1854
        self.assertEqual(r.status_code, 204)
1855

  
1856
        # check content
1857
        r = self.get(url)
1858
        content = r.content
1859
        self.assertEqual(content, d2 + d3[-1])
1860

  
1280 1861

  
1281 1862
class ObjectDelete(PithosAPITest):
1282 1863
    def setUp(self):
b/snf-pithos-app/pithos/api/test/permissions.py
307 307
                url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
308 308
                HTTP_X_OBJECT_SHARING='write=%s:%s' % (self.user, group))
309 309
            self._assert_write(self.object, members)
310

  
311
    def test_not_allowed(self):
312
        cname = self.create_container()[0]
313
        oname, odata = self.upload_object(cname)[:-1]
314

  
315
        url = join_urls(self.pithos_path, self.user)
316
        r = self.head(url, user='chuck')
317
        self.assertEqual(r.status_code, 403)
318
        r = self.get(url, user='chuck')
319
        self.assertEqual(r.status_code, 403)
320
        r = self.post(url, user='chuck', data=get_random_data())
321
        self.assertEqual(r.status_code, 403)
322

  
323
        url = join_urls(self.pithos_path, self.user, cname)
324
        r = self.head(url, user='chuck')
325
        self.assertEqual(r.status_code, 403)
326
        r = self.get(url, user='chuck')
327
        self.assertEqual(r.status_code, 403)
328
        r = self.put(url, user='chuck', data=get_random_data())
329
        self.assertEqual(r.status_code, 403)
330
        r = self.post(url, user='chuck', data=get_random_data())
331
        self.assertEqual(r.status_code, 403)
332
        r = self.delete(url, user='chuck')
333
        self.assertEqual(r.status_code, 403)
334

  
335
        url = join_urls(self.pithos_path, self.user, cname, oname)
336
        r = self.head(url, user='chuck')
337
        self.assertEqual(r.status_code, 403)
338
        r = self.get(url, user='chuck')
339
        self.assertEqual(r.status_code, 403)
340
        r = self.put(url, user='chuck', data=get_random_data())
341
        self.assertEqual(r.status_code, 403)
342
        r = self.post(url, user='chuck', data=get_random_data())
343
        self.assertEqual(r.status_code, 403)
344
        r = self.delete(url, user='chuck')
345
        self.assertEqual(r.status_code, 403)
b/snf-pithos-app/pithos/api/test/public.py
58 58
        (self.assertTrue(l in pithos_settings.PUBLIC_URL_ALPHABET) for
59 59
         l in public)
60 60

  
61
        r = self.get(public, user='user2')
61
        r = self.get(public, user='user2', token=None)
62 62
        self.assertEqual(r.status_code, 200)
63 63
        self.assertTrue('X-Object-Public' not in r)
64 64

  
......
212 212
        public2 = self._assert_public_object(cname, oname, odata)
213 213
        self.assertEqual(public, public2)
214 214

  
215
        # delete object histoy until now
215
        # delete object history until now
216 216
        _time.sleep(1)
217 217
        t = datetime.datetime.utcnow()
218 218
        now = int(_time.mktime(t.timetuple()))
......
222 222
        self.assertEqual(r.status_code, 404)
223 223
        r = self.get(public)
224 224
        self.assertEqual(r.status_code, 404)
225

  
226
    def test_public_manifest(self):
227
        cname = self.create_container()[0]
228
        prefix = 'myobject/'
229
        data = ''
230
        for i in range(random.randint(2, 10)):
231
            part = '%s%d' % (prefix, i)
232
            data += self.upload_object(cname, oname=part)[1]
233

  
234
        manifest = '%s/%s' % (cname, prefix)
235
        oname = get_random_name()
236
        url = join_urls(self.pithos_path, self.user, cname, oname)
237
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST=manifest,
238
                     HTTP_X_OBJECT_PUBLIC='true')
239
        self.assertEqual(r.status_code, 201)
240

  
241
        r = self.head(url)
242
        self.assertTrue('X-Object-Manifest' in r)
243
        self.assertEqual(r['X-Object-Manifest'], manifest)
244

  
245
        self.assertTrue('X-Object-Public' in r)
246
        public = r['X-Object-Public']
247

  
248
        r = self.get(public)
249
        self.assertTrue(r.content, data)
250
        #self.assertTrue('X-Object-Manifest' in r)
251
        #self.assertEqual(r['X-Object-Manifest'], manifest)
b/snf-pithos-app/pithos/api/tests.py
40 40
from pithos.api.test.views import *
41 41
from pithos.api.test.unicode import *
42 42
from pithos.api.test.listing import *
43
from pithos.api.test.top_level import *

Also available in: Unified diff