Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / permissions.py @ 7e402b46

History | View | Annotate | Download (15.4 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api.test import PithosAPITest
38
from pithos.api.test.util import get_random_data, get_random_name
39

    
40
from synnefo.lib import join_urls
41

    
42

    
43
class TestPermissions(PithosAPITest):
44
    def setUp(self):
45
        PithosAPITest.setUp(self)
46

    
47
        # create a group
48
        self.users = ['alice', 'bob', 'chuck', 'dan', 'mallory', 'oscar']
49
        self.groups = {'group1': self.users[:2]}
50
        kwargs = dict(('HTTP_X_ACCOUNT_GROUP_%s' % k.upper(), ','.join(v)) for
51
                      k, v in self.groups.items())
52
        url = join_urls(self.pithos_path, self.user)
53
        r = self.post(url, **kwargs)
54
        self.assertEqual(r.status_code, 202)
55

    
56
        self.container = get_random_name()
57
        self.create_container(self.container)
58
        self.object = self.upload_object(self.container)[0]
59
        l = [self.object + '/', self.object + '/a', self.object + 'a',
60
             self.object + 'a/']
61
        for i in l:
62
            self.upload_object(self.container, i)
63

    
64
    def _assert_read(self, object, authorized=None):
65
        authorized = authorized or []
66
        for user in self.users:
67
            url = join_urls(
68
                self.pithos_path, self.user, self.container, object)
69
            r = self.head(url, user=user)
70
            if user in authorized:
71
                self.assertEqual(r.status_code, 200)
72
            else:
73
                self.assertEqual(r.status_code, 403)
74

    
75
            r = self.get(url, user=user)
76
            if user in authorized:
77
                self.assertEqual(r.status_code, 200)
78
            else:
79
                self.assertEqual(r.status_code, 403)
80

    
81
        # check inheritance
82
        info = self.get_object_info(self.container, object)
83
        is_directory = info['Content-Type'] in ('application/directory',
84
                                                'application/folder')
85
        prefix = object + '/'
86
        derivatives = [o['name'] for o in self.list_objects(
87
            self.container, prefix=object) if o['name'] != object]
88
        for o in derivatives:
89
            url = join_urls(self.pithos_path, self.user, self.container, o)
90
            for user in self.users:
91
                if (user in authorized and is_directory and
92
                        o.startswith(prefix)):
93
                    r = self.head(url, user=user)
94
                    self.assertEqual(r.status_code, 200)
95

    
96
                    r = self.get(url, user=user)
97
                    self.assertEqual(r.status_code, 200)
98
                else:
99
                    r = self.head(url, user=user)
100
                    self.assertEqual(r.status_code, 403)
101

    
102
                    r = self.get(url, user=user)
103
                    self.assertEqual(r.status_code, 403)
104

    
105
    def _assert_write(self, object, authorized=None):
106
        authorized = authorized or []
107

    
108
        url = join_urls(self.pithos_path, self.user, self.container, object)
109
        for user in self.users:
110
            if user in authorized:
111
                r = self.get(url)
112
                self.assertEqual(r.status_code, 200)
113
                initial_data = r.content
114

    
115
                # test write access
116
                data = get_random_data()
117
                r = self.post(url, user=user, data=data,
118
                              content_type='application/octet-stream',
119
                              HTTP_CONTENT_LENGTH=str(len(data)),
120
                              HTTP_CONTENT_RANGE='bytes */*')
121
                self.assertEqual(r.status_code, 204)
122

    
123
                # test read access
124
                r = self.get(url, user=user)
125
                self.assertEqual(r.status_code, 200)
126
                server_data = r.content
127
                self.assertEqual(server_data, initial_data + data)
128
            else:
129
                # test write access
130
                data = get_random_data()
131
                r = self.post(url, user=user, data=data,
132
                              content_type='application/octet-stream',
133
                              HTTP_CONTENT_LENGTH=str(len(data)),
134
                              HTTP_CONTENT_RANGE='bytes */*')
135
                self.assertEqual(r.status_code, 403)
136

    
137
        # check inheritance
138
        info = self.get_object_info(self.container, object)
139
        is_directory = info['Content-Type'] in ('application/directory',
140
                                                'application/folder')
141
        prefix = object + '/'
142

    
143
        derivatives = [o['name'] for o in self.list_objects(
144
            self.container, prefix=object) if o['name'] != object]
145

    
146
        for o in derivatives:
147
            url = join_urls(self.pithos_path, self.user, self.container, o)
148
            for user in self.users:
149
                if (user in authorized and is_directory and
150
                        o.startswith(prefix)):
151
                    # get initial data
152
                    r = self.get(url)
153
                    self.assertEqual(r.status_code, 200)
154
                    initial_data = r.content
155

    
156
                    # test write access
157
                    data = get_random_data()
158
                    r = self.post(url, user=user, data=data,
159
                                  content_type='application/octet-stream',
160
                                  HTTP_CONTENT_LENGTH=str(len(data)),
161
                                  HTTP_CONTENT_RANGE='bytes */*')
162
                    self.assertEqual(r.status_code, 204)
163

    
164
                    # test read access
165
                    r = self.get(url, user=user)
166
                    self.assertEqual(r.status_code, 200)
167
                    server_data = r.content
168
                    self.assertEqual(server_data, initial_data + data)
169
                    initial_data = server_data
170
                else:
171
                    # test write access
172
                    data = get_random_data()
173
                    r = self.post(url, user=user, data=data,
174
                                  content_type='application/octet-stream',
175
                                  HTTP_CONTENT_LENGTH=str(len(data)),
176
                                  HTTP_CONTENT_RANGE='bytes */*')
177
                    self.assertEqual(r.status_code, 403)
178

    
179
    def test_group_read(self):
180
        group = self.groups.keys()[0]
181
        members = self.groups[group]
182
        url = join_urls(
183
            self.pithos_path, self.user, self.container, self.object)
184
        r = self.post(
185
            url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
186
            HTTP_X_OBJECT_SHARING='read=%s:%s' % (self.user, group))
187
        self.assertEqual(r.status_code, 202)
188
        self._assert_read(self.object, authorized=members)
189

    
190
    def test_read_many(self):
191
        l = self.users[:2]
192
        url = join_urls(
193
            self.pithos_path, self.user, self.container, self.object)
194
        r = self.post(
195
            url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
196
            HTTP_X_OBJECT_SHARING='read=%s' % ','.join(l))
197
        self.assertEqual(r.status_code, 202)
198
        self._assert_read(self.object, authorized=l)
199

    
200
    def test_read_by_everyone(self):
201
        url = join_urls(
202
            self.pithos_path, self.user, self.container, self.object)
203
        r = self.post(
204
            url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
205
            HTTP_X_OBJECT_SHARING='read=*')
206
        self.assertEqual(r.status_code, 202)
207
        self._assert_read(self.object, authorized=self.users)
208

    
209
    def test_read_directory(self):
210
        url = join_urls(
211
            self.pithos_path, self.user, self.container, self.object)
212
        for type in ('application/directory', 'application/folder'):
213
            # change content type
214
            r = self.put(url, data='', content_type=type,
215
                         HTTP_X_MOVE_FROM='/%s/%s' % (
216
                             self.container, self.object))
217
            self.assertEqual(r.status_code, 201)
218
            info = self.get_object_info(self.container, self.object)
219
            self.assertEqual(info['Content-Type'], type)
220

    
221
            url = join_urls(
222
                self.pithos_path, self.user, self.container, self.object)
223
            r = self.post(
224
                url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
225
                HTTP_X_OBJECT_SHARING='read=*')
226
            self._assert_read(self.object, self.users)
227

    
228
            url = join_urls(
229
                self.pithos_path, self.user, self.container, self.object)
230
            r = self.post(
231
                url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
232
                HTTP_X_OBJECT_SHARING='read=%s' % ','.join(
233
                    self.users[:2]))
234
            self._assert_read(self.object, self.users[:2])
235

    
236
            group = self.groups.keys()[0]
237
            members = self.groups[group]
238
            url = join_urls(
239
                self.pithos_path, self.user, self.container, self.object)
240
            r = self.post(
241
                url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
242
                HTTP_X_OBJECT_SHARING='read=%s:%s' % (self.user, group))
243
            self._assert_read(self.object, members)
244

    
245
    def test_group_write(self):
246
        group = self.groups.keys()[0]
247
        members = self.groups[group]
248
        url = join_urls(
249
            self.pithos_path, self.user, self.container, self.object)
250
        r = self.post(
251
            url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
252
            HTTP_X_OBJECT_SHARING='write=%s:%s' % (self.user, group))
253
        self.assertEqual(r.status_code, 202)
254
        self._assert_write(self.object, authorized=members)
255

    
256
    def test_write_many(self):
257
        l = self.users[:2]
258
        url = join_urls(
259
            self.pithos_path, self.user, self.container, self.object)
260
        r = self.post(
261
            url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
262
            HTTP_X_OBJECT_SHARING='write=%s' % ','.join(l))
263
        self.assertEqual(r.status_code, 202)
264
        self._assert_write(self.object, authorized=l)
265

    
266
    def test_write_by_everyone(self):
267
        url = join_urls(
268
            self.pithos_path, self.user, self.container, self.object)
269
        r = self.post(
270
            url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
271
            HTTP_X_OBJECT_SHARING='write=*')
272
        self.assertEqual(r.status_code, 202)
273
        self._assert_write(self.object, authorized=self.users)
274

    
275
    def test_write_directory(self):
276
        url = join_urls(
277
            self.pithos_path, self.user, self.container, self.object)
278
        for type in ('application/directory', 'application/folder'):
279
            # change content type
280
            r = self.put(url, data='', content_type=type,
281
                         HTTP_X_MOVE_FROM='/%s/%s' % (
282
                             self.container, self.object))
283
            self.assertEqual(r.status_code, 201)
284
            info = self.get_object_info(self.container, self.object)
285
            self.assertEqual(info['Content-Type'], type)
286

    
287
            url = join_urls(
288
                self.pithos_path, self.user, self.container, self.object)
289
            r = self.post(
290
                url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
291
                HTTP_X_OBJECT_SHARING='write=*')
292
            self._assert_write(self.object, self.users)
293

    
294
            url = join_urls(
295
                self.pithos_path, self.user, self.container, self.object)
296
            r = self.post(
297
                url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
298
                HTTP_X_OBJECT_SHARING='write=%s' % ','.join(
299
                    self.users[:2]))
300
            self._assert_write(self.object, self.users[:2])
301

    
302
            group = self.groups.keys()[0]
303
            members = self.groups[group]
304
            url = join_urls(
305
                self.pithos_path, self.user, self.container, self.object)
306
            r = self.post(
307
                url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
308
                HTTP_X_OBJECT_SHARING='write=%s:%s' % (self.user, group))
309
            self._assert_write(self.object, members)
310

    
311
    def test_not_allowed(self):
312
        cname = self.create_container()[0]
313
        oname, odata = self.upload_object(cname)[:-1]
314

    
315
        url = join_urls(self.pithos_path, self.user)
316
        r = self.head(url, user='chuck')
317
        self.assertEqual(r.status_code, 403)
318
        r = self.get(url, user='chuck')
319
        self.assertEqual(r.status_code, 403)
320
        r = self.post(url, user='chuck', data=get_random_data())
321
        self.assertEqual(r.status_code, 403)
322

    
323
        url = join_urls(self.pithos_path, self.user, cname)
324
        r = self.head(url, user='chuck')
325
        self.assertEqual(r.status_code, 403)
326
        r = self.get(url, user='chuck')
327
        self.assertEqual(r.status_code, 403)
328
        r = self.put(url, user='chuck', data=get_random_data())
329
        self.assertEqual(r.status_code, 403)
330
        r = self.post(url, user='chuck', data=get_random_data())
331
        self.assertEqual(r.status_code, 403)
332
        r = self.delete(url, user='chuck')
333
        self.assertEqual(r.status_code, 403)
334

    
335
        url = join_urls(self.pithos_path, self.user, cname, oname)
336
        r = self.head(url, user='chuck')
337
        self.assertEqual(r.status_code, 403)
338
        r = self.get(url, user='chuck')
339
        self.assertEqual(r.status_code, 403)
340
        r = self.put(url, user='chuck', data=get_random_data())
341
        self.assertEqual(r.status_code, 403)
342
        r = self.post(url, user='chuck', data=get_random_data())
343
        self.assertEqual(r.status_code, 403)
344
        r = self.delete(url, user='chuck')
345
        self.assertEqual(r.status_code, 403)
346

    
347
    def test_multiple_inheritance(self):
348
        cname = self.container
349
        folder = self.create_folder(cname, HTTP_X_OBJECT_SHARING='write=*')[0]
350
        subfolder = self.create_folder(cname, '%s/%s' % (folder,
351
                                                         get_random_name()))[0]
352
        self.upload_object(cname, '%s/%s' % (subfolder, get_random_name()))
353

    
354
        self._assert_read(subfolder, self.users)
355
        self._assert_write(subfolder, self.users)
356

    
357
        # share object for read only
358
        url = join_urls(self.pithos_path, self.user, cname, subfolder)
359
        self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
360
                  HTTP_X_OBJECT_SHARING='read=*')
361

    
362
        self._assert_read(subfolder, self.users)
363
        self._assert_write(subfolder, [])