Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / tests.py @ 5ec446aa

History | View | Annotate | Download (10 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import unittest
35
import random
36
import string
37
import datetime
38
import time as _time
39

    
40
import pithos.api.settings as settings
41

    
42
from pithos.api.manage_accounts import ManageAccounts
43

    
44
def get_random_data(length=500):
45
    char_set = string.ascii_uppercase + string.digits
46
    return ''.join(random.choice(char_set) for x in xrange(length))
47

    
48
class TestPublic(unittest.TestCase):
49
    def setUp(self):
50
        self.utils = ManageAccounts()
51
        self.backend = self.utils.backend
52
        self.utils.create_account('account')
53

    
54
    def tearDown(self):
55
        self.utils._delete_account('account')
56
        self.utils.cleanup()
57

    
58
    def assert_not_public_object(self, account, container, object):
59
        public = self.backend.get_object_public(
60
            account, account, container, object
61
        )
62
        self.assertTrue(public == None)
63
        self.assertRaises(
64
            NameError,
65
            self.backend.get_public,
66
            '$$account$$',
67
            public
68
        )
69
        self.assertRaises(
70
            Exception, self.backend._can_read,
71
            '$$account$$', account, container, object
72
        )
73
        return public
74

    
75
    def assert_public_object(self, account, container, object):
76
        public = self.backend.get_object_public(
77
            account, account, container, object
78
        )
79
        self.assertTrue(public != None)
80
        self.assertTrue(len(public) >= settings.PUBLIC_URL_SECURITY)
81
        self.assertTrue(set(public) <= set(settings.PUBLIC_URL_ALPHABET))
82
        self.assertEqual(
83
            self.backend.get_public('$$account$$', public),
84
            (account, container, object)
85
        )
86
        try:
87
            self.backend._can_read('$$account$$', account, container, object)
88
        except Exception:
89
            self.fail('Public object should be readable.')
90
        return public
91

    
92
    def test_set_object_public(self):
93
        self.utils.backend.put_container('account', 'account', 'container')
94
        data = get_random_data(int(random.random()))
95
        self.utils.create_update_object(
96
            'account',
97
            'container',
98
            'object',
99
            'application/octet-stream',
100
            data
101
        )
102
        self.assert_not_public_object('account', 'container', 'object')
103

    
104
        self.backend.permissions.public_set(
105
            'account/container/object',
106
            self.backend.public_url_security,
107
            self.backend.public_url_alphabet
108
        )
109
        self.assert_public_object('account', 'container', 'object')
110

    
111
    def test_set_twice(self):
112
        self.utils.backend.put_container('account', 'account', 'container')
113
        data = get_random_data(int(random.random()))
114
        self.utils.create_update_object(
115
            'account',
116
            'container',
117
            'object',
118
            'application/octet-stream',
119
            data
120
        )
121
        self.backend.permissions.public_set(
122
            'account/container/object',
123
            self.backend.public_url_security,
124
            self.backend.public_url_alphabet
125
        )
126
        public = self.assert_public_object('account', 'container', 'object')
127

    
128
        self.backend.permissions.public_set(
129
            'account/container/object',
130
            self.backend.public_url_security,
131
            self.backend.public_url_alphabet
132
        )
133
        public2 = self.assert_public_object('account', 'container', 'object')
134

    
135
        self.assertEqual(public, public2)
136

    
137
    def test_set_unset_set(self):
138
        self.utils.backend.put_container('account', 'account', 'container')
139
        data = get_random_data(int(random.random()))
140
        self.utils.create_update_object(
141
            'account',
142
            'container',
143
            'object',
144
            'application/octet-stream',
145
            data
146
        )
147
        self.backend.permissions.public_set(
148
            'account/container/object',
149
            self.backend.public_url_security,
150
            self.backend.public_url_alphabet
151
        )
152
        public = self.assert_public_object('account', 'container', 'object')
153

    
154
        self.backend.permissions.public_unset('account/container/object')
155
        self.assert_not_public_object('account', 'container', 'object')
156

    
157
        self.backend.permissions.public_set(
158
            'account/container/object',
159
            self.backend.public_url_security,
160
            self.backend.public_url_alphabet
161
        )
162
        public3 = self.assert_public_object('account', 'container', 'object')
163

    
164
        self.assertTrue(public != public3)
165

    
166
    def test_update_object_public(self):
167
        self.utils.backend.put_container('account', 'account', 'container')
168
        data = get_random_data(int(random.random()))
169
        self.utils.create_update_object(
170
            'account',
171
            'container',
172
            'object',
173
            'application/octet-stream',
174
            data
175
        )
176

    
177
        self.backend.update_object_public(
178
            'account', 'account', 'container', 'object', public=False
179
        )
180
        self.assert_not_public_object('account', 'container', 'object')
181

    
182
        self.backend.update_object_public(
183
            'account', 'account', 'container', 'object', public=True
184
        )
185
        public = self.assert_public_object('account', 'container', 'object')
186

    
187
        self.backend.update_object_public(
188
            'account', 'account', 'container', 'object', public=False
189
        )
190
        self.assert_not_public_object('account', 'container', 'object')
191

    
192
        self.backend.update_object_public(
193
            'account', 'account', 'container', 'object', public=True
194
        )
195
        new_public = self.assert_public_object('account', 'container', 'object')
196
        self.assertTrue(public != new_public)
197

    
198
    def test_delete_not_public_object(self):
199
        self.utils.backend.put_container('account', 'account', 'container')
200
        data = get_random_data(int(random.random()))
201
        self.utils.create_update_object(
202
            'account',
203
            'container',
204
            'object',
205
            'application/octet-stream',
206
            data
207
        )
208
        self.assert_not_public_object('account', 'container', 'object')
209

    
210
        self.backend.delete_object('account', 'account', 'container', 'object')
211

    
212
        self.assert_not_public_object('account', 'container', 'object')
213

    
214
    def test_delete_public_object(self):
215
        self.utils.backend.put_container('account', 'account', 'container')
216
        data = get_random_data(int(random.random()))
217
        self.utils.create_update_object(
218
            'account',
219
            'container',
220
            'object',
221
            'application/octet-stream',
222
            data
223
        )
224
        self.assert_not_public_object('account', 'container', 'object')
225

    
226
        self.backend.permissions.public_set(
227
            'account/container/object',
228
            self.backend.public_url_security,
229
            self.backend.public_url_alphabet
230
        )
231
        self.assert_public_object('account', 'container', 'object')
232

    
233
        self.backend.delete_object('account', 'account', 'container', 'object')
234
        self.assert_not_public_object('account', 'container', 'object')
235

    
236
    def test_delete_public_object_history(self):
237
        self.utils.backend.put_container('account', 'account', 'container')
238
        for i in range(random.randint(1, 10)):
239
            data = get_random_data(int(random.random()))
240
            self.utils.create_update_object(
241
                'account',
242
                'container',
243
                'object',
244
                'application/octet-stream',
245
                data
246
            )
247
            _time.sleep(1)
248
        versions = self.backend.list_versions(
249
            'account', 'account', 'container', 'object'
250
        )
251
        mtime = [int(i[1]) for i in versions]
252
        self.assert_not_public_object('account', 'container', 'object')
253

    
254
        self.backend.permissions.public_set(
255
            'account/container/object',
256
            self.backend.public_url_security,
257
            self.backend.public_url_alphabet
258
        )
259
        public = self.assert_public_object('account', 'container', 'object')
260

    
261
        i = random.randrange(len(mtime))
262
        self.backend.delete_object(
263
            'account', 'account', 'container', 'object', until=mtime[i]
264
        )
265
        self.assert_public_object('account', 'container', 'object')
266
        public = self.assert_public_object('account', 'container', 'object')
267

    
268
        _time.sleep(1)
269
        t = datetime.datetime.utcnow()
270
        now = int(_time.mktime(t.timetuple()))
271
        self.backend.delete_object(
272
            'account', 'account', 'container', 'object', until=now
273
        )
274
        self.assertRaises(
275
            NameError,
276
            self.backend.get_public,
277
            '$$account$$',
278
            public
279
        )
280

    
281
if __name__ == '__main__':
282
    unittest.main()