Revision ee7a28be

b/snf-pithos-app/pithos/api/test/unicode.py
1
#!/usr/bin/env python
2
#coding=utf8
3

  
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

  
37
from pithos.api.test import PithosAPITest, TEST_BLOCK_SIZE
38
from pithos.api.test.util import get_random_data
39

  
40
from synnefo.lib import join_urls
41

  
42
from urllib import quote
43

  
44
import random
45

  
46

  
47
class TestUnicode(PithosAPITest):
48
    #def setUp(self):
49
    #    super(TestUnicode, self).setUp()
50
    #    self.user = 'χρήστης'
51

  
52
    def test_create_container(self):
53
        cname = 'φάκελος'
54
        self.create_container(cname)
55
        url = join_urls(self.pithos_path, self.user, cname)
56
        r = self.head(url)
57
        self.assertEqual(r.status_code, 204)
58

  
59
        url = join_urls(self.pithos_path, self.user)
60
        r = self.get(url)
61
        self.assertEqual(r.status_code, 200)
62
        containers = r.content.split('\n')
63
        self.assertTrue('φάκελος' in containers)
64

  
65
    def test_create_object(self):
66
        cname = 'φάκελος'
67
        oname = 'αντικείμενο'
68
        self.create_container(cname)
69
        odata = self.upload_object(cname, oname)[1]
70

  
71
        url = join_urls(self.pithos_path, self.user, cname, oname)
72
        r = self.head(url)
73
        self.assertEqual(r.status_code, 200)
74

  
75
        r = self.get(url)
76
        self.assertEqual(r.status_code, 200)
77
        self.assertEqual(r.content, odata)
78

  
79
        url = join_urls(self.pithos_path, self.user, cname)
80
        r = self.get(url)
81
        objects = r.content.split('\n')
82
        self.assertEqual(r.status_code, 200)
83
        self.assertTrue('αντικείμενο' in objects)
84

  
85
    def test_copy_object(self):
86
        src_container = 'φάκελος'
87
        src_object = 'αντικείμενο'
88
        dest_container = 'αντίγραφα'
89
        dest_object = 'ασφαλές-αντίγραφο'
90

  
91
        self.create_container(src_container)
92
        self.upload_object(src_container, src_object)
93

  
94
        self.create_container(dest_container)
95
        url = join_urls(self.pithos_path, self.user, dest_container,
96
                        dest_object)
97
        self.put(url, data='',
98
                 HTTP_X_COPY_FROM='/%s/%s' % (src_container, src_object))
99

  
100
        # assert destination object exists
101
        r = self.head(url)
102
        self.assertEqual(r.status_code, 200)
103
        r = self.get(url)
104
        self.assertEqual(r.status_code, 200)
105

  
106
        # assert source object exists
107
        url = join_urls(self.pithos_path, self.user, src_container,
108
                        src_object)
109
        r = self.head(url)
110
        self.assertEqual(r.status_code, 200)
111
        r = self.get(url)
112
        self.assertEqual(r.status_code, 200)
113

  
114
        # list source container objects
115
        url = join_urls(self.pithos_path, self.user, src_container)
116
        r = self.get(url)
117
        self.assertEqual(r.status_code, 200)
118
        objects = r.content.split('\n')
119
        self.assertTrue(src_object in objects)
120
        self.assertTrue(dest_object not in objects)
121

  
122
        # list destination container objects
123
        url = join_urls(self.pithos_path, self.user, dest_container)
124
        r = self.get(url)
125
        self.assertEqual(r.status_code, 200)
126
        objects = r.content.split('\n')
127
        self.assertTrue(src_object not in objects)
128
        self.assertTrue(dest_object in objects)
129

  
130
    def test_move_object(self):
131
        src_container = 'φάκελος'
132
        src_object = 'αντικείμενο'
133
        dest_container = 'αντίγραα'
134
        dest_object = 'ασφαλές-αντίγραφο'
135

  
136
        self.create_container(src_container)
137
        self.upload_object(src_container, src_object)
138

  
139
        self.create_container(dest_container)
140
        url = join_urls(self.pithos_path, self.user, dest_container,
141
                        dest_object)
142
        self.put(url, data='',
143
                 HTTP_X_MOVE_FROM='/%s/%s' % (src_container, src_object))
144

  
145
        # assert destination object exists
146
        r = self.head(url)
147
        self.assertEqual(r.status_code, 200)
148
        r = self.get(url)
149
        self.assertEqual(r.status_code, 200)
150

  
151
        # assert source object does not exist
152
        url = join_urls(self.pithos_path, self.user, src_container,
153
                        src_object)
154
        r = self.head(url)
155
        self.assertEqual(r.status_code, 404)
156
        r = self.get(url)
157
        self.assertEqual(r.status_code, 404)
158

  
159
        # list source container objects
160
        url = join_urls(self.pithos_path, self.user, src_container)
161
        r = self.get(url)
162
        self.assertEqual(r.status_code, 204)
163
        objects = r.content.split('\n')
164
        self.assertTrue(src_object not in objects)
165
        self.assertTrue(dest_object not in objects)
166

  
167
        # list destination container objects
168
        url = join_urls(self.pithos_path, self.user, dest_container)
169
        r = self.get(url)
170
        self.assertEqual(r.status_code, 200)
171
        objects = r.content.split('\n')
172
        self.assertTrue(src_object not in objects)
173
        self.assertTrue(dest_object in objects)
174

  
175
    def test_delete_object(self):
176
        self.create_container('φάκελος')
177
        self.upload_object('φάκελος', 'αντικείμενο')
178
        url = join_urls(self.pithos_path, self.user, 'φάκελος')
179
        r = self.get(url)
180
        self.assertEqual(r.status_code, 200)
181
        objects = r.content.split('\n')
182
        self.assertTrue('αντικείμενο' in objects)
183

  
184
        url = join_urls(self.pithos_path, self.user, 'φάκελος', 'αντικείμενο')
185
        r = self.head(url)
186
        self.assertEqual(r.status_code, 200)
187
        r = self.get(url)
188
        self.assertEqual(r.status_code, 200)
189

  
190
        r = self.delete(url)
191
        r = self.head(url)
192
        self.assertEqual(r.status_code, 404)
193
        r = self.get(url)
194
        self.assertEqual(r.status_code, 404)
195
        url = join_urls(self.pithos_path, self.user, 'φάκελος')
196
        r = self.get(url)
197
        objects = r.content.split('\n')
198
        self.assertTrue('αντικείμενο' not in objects)
199

  
200
    def test_delete_container(self):
201
        self.create_container('φάκελος')
202
        url = join_urls(self.pithos_path, self.user)
203
        r = self.get(url)
204
        containers = r.content.split('\n')
205
        self.assertTrue('φάκελος' in containers)
206

  
207
        self.upload_object('φάκελος', 'αντικείμενο')
208
        url = join_urls(self.pithos_path, self.user, 'φάκελος')
209
        r = self.get(url)
210
        self.assertEqual(r.status_code, 200)
211
        objects = r.content.split('\n')
212
        self.assertTrue('αντικείμενο' in objects)
213

  
214
        url = join_urls(self.pithos_path, self.user, 'φάκελος', 'αντικείμενο')
215
        r = self.head(url)
216
        self.assertEqual(r.status_code, 200)
217
        r = self.get(url)
218
        self.assertEqual(r.status_code, 200)
219

  
220
        url = join_urls(self.pithos_path, self.user, 'φάκελος')
221
        r = self.delete(url)
222
        self.assertEqual(r.status_code, 409)
223

  
224
        url = join_urls(self.pithos_path, self.user, 'φάκελος', 'αντικείμενο')
225
        r = self.delete(url)
226
        r = self.head(url)
227
        self.assertEqual(r.status_code, 404)
228
        r = self.get(url)
229
        self.assertEqual(r.status_code, 404)
230
        url = join_urls(self.pithos_path, self.user, 'φάκελος')
231
        r = self.get(url)
232
        objects = r.content.split('\n')
233
        self.assertTrue('αντικείμενο' not in objects)
234

  
235
        url = join_urls(self.pithos_path, self.user, 'φάκελος')
236
        r = self.delete(url)
237
        self.assertEqual(r.status_code, 204)
238

  
239
        url = join_urls(self.pithos_path, self.user)
240
        r = self.get(url)
241
        containers = r.content.split('\n')
242
        self.assertTrue('φάκελος' not in containers)
243

  
244
    def test_account_meta(self):
245
        url = join_urls(self.pithos_path, self.user)
246
        headers = {'HTTP_X_ACCOUNT_META_Ποιότητα': 'Ααα'}
247
        r = self.post(url, content_type='', **headers)
248
        self.assertEqual(r.status_code, 202)
249

  
250
        meta = self.get_account_meta()
251
        self.assertTrue('Ποιότητα' in meta)
252
        self.assertEqual(meta['Ποιότητα'], 'Ααα')
253

  
254
    def test_container_meta(self):
255
        url = join_urls(self.pithos_path, self.user, 'φάκελος')
256
        headers = {'HTTP_X_CONTAINER_META_Ποιότητα': 'Ααα'}
257
        r = self.put(url, data='', **headers)
258
        self.assertEqual(r.status_code, 201)
259

  
260
        meta = self.get_container_meta('φάκελος')
261
        self.assertTrue('Ποιότητα' in meta)
262
        self.assertEqual(meta['Ποιότητα'], 'Ααα')
263

  
264
    def test_object_meta(self):
265
        self.create_container('φάκελος')
266
        url = join_urls(self.pithos_path, self.user, 'φάκελος', 'αντικείμενο')
267
        headers = {'HTTP_X_OBJECT_META_Ποιότητα': 'Ααα'}
268
        r = self.put(url, data=get_random_data(), **headers)
269
        self.assertEqual(r.status_code, 201)
270

  
271
        meta = self.get_object_meta('φάκελος', 'αντικείμενο')
272
        self.assertTrue('Ποιότητα' in meta)
273
        self.assertEqual(meta['Ποιότητα'], 'Ααα')
274

  
275
    def test_list_meta_filtering(self):
276
        self.create_container('φάκελος')
277
        meta = {'ποιότητα': 'Ααα'}
278
        self.upload_object('φάκελος', 'ο1', **meta)
279
        self.upload_object('φάκελος', 'ο2')
280
        self.upload_object('φάκελος', 'ο3')
281

  
282
        meta = {'ποσότητα': 'μεγάλη'}
283
        self.update_object_meta('φάκελος', 'ο2', meta)
284

  
285
        url = join_urls(self.pithos_path, self.user, 'φάκελος')
286
        r = self.get('%s?meta=%s' % (url, quote('ποιότητα, ποσότητα')))
287
        self.assertEqual(r.status_code, 200)
288
        objects = r.content.split('\n')
289
        if '' in objects:
290
            objects.remove('')
291
        self.assertEquals(objects, ['ο1', 'ο2'])
292

  
293
        r = self.get('%s?meta=%s' % (url, quote('!ποιότητα')))
294
        self.assertEqual(r.status_code, 200)
295
        objects = r.content.split('\n')
296
        if '' in objects:
297
            objects.remove('')
298
        self.assertEquals(objects, ['ο2', 'ο3'])
299

  
300
        r = self.get('%s?meta=%s' % (url, quote('!ποιότητα, !ποσότητα')))
301
        self.assertEqual(r.status_code, 200)
302
        objects = r.content.split('\n')
303
        if '' in objects:
304
            objects.remove('')
305
        self.assertEquals(objects, ['ο3'])
306

  
307
        meta = {'ποιότητα': 'ΑΒ'}
308
        self.update_object_meta('φάκελος', 'ο2', meta)
309
        r = self.get('%s?meta=%s' % (url, quote('ποιότητα=Ααα')))
310
        self.assertEqual(r.status_code, 200)
311
        objects = r.content.split('\n')
312
        if '' in objects:
313
            objects.remove('')
314
        self.assertEquals(objects, ['ο1'])
315

  
316
        r = self.get('%s?meta=%s' % (url, quote('ποιότητα!=Ααα')))
317
        self.assertEqual(r.status_code, 200)
318
        objects = r.content.split('\n')
319
        if '' in objects:
320
            objects.remove('')
321
        self.assertEquals(objects, ['ο2'])
322

  
323
        meta = {'έτος': '2011'}
324
        self.update_object_meta('φάκελος', 'ο3', meta)
325
        meta = {'έτος': '2012'}
326
        self.update_object_meta('φάκελος', 'ο2', meta)
327
        r = self.get('%s?meta=%s' % (url, quote('έτος<2012')))
328
        self.assertEqual(r.status_code, 200)
329
        objects = r.content.split('\n')
330
        if '' in objects:
331
            objects.remove('')
332
        self.assertEquals(objects, ['ο3'])
333

  
334
        r = self.get('%s?meta=%s' % (url, quote('έτος<=2012')))
335
        self.assertEqual(r.status_code, 200)
336
        objects = r.content.split('\n')
337
        if '' in objects:
338
            objects.remove('')
339
        self.assertEquals(objects, ['ο2', 'ο3'])
340

  
341
        r = self.get('%s?meta=%s' % (url, quote('έτος<=2012, έτος!=2011')))
342
        self.assertEqual(r.status_code, 200)
343
        objects = r.content.split('\n')
344
        if '' in objects:
345
            objects.remove('')
346
        self.assertEquals(objects, ['ο2'])
347

  
348
        r = self.get('%s?meta=%s' % (url, quote('έτος<2012, έτος!=2011')))
349
        self.assertEqual(r.status_code, 204)
350
        objects = r.content.split('\n')
351
        if '' in objects:
352
            objects.remove('')
353
        self.assertEquals(objects, [])
354

  
355
    def test_groups(self):
356
        # create a group
357
        headers = {'HTTP_X_ACCOUNT_GROUP_γκρουπ': 'chazapis,διογένης'}
358
        url = join_urls(self.pithos_path, self.user)
359
        r = self.post(url, **headers)
360
        self.assertEqual(r.status_code, 202)
361

  
362
        groups = self.get_account_groups()
363
        self.assertTrue('γκρουπ' in groups)
364
        self.assertEqual(groups['γκρουπ'], 'chazapis,διογένης')
365

  
366
        # check read access
367
        self.create_container('φάκελος')
368
        odata = self.upload_object('φάκελος', 'ο1')[1]
369

  
370
        r = self.head(url, user='διογένης')
371
        self.assertEqual(r.status_code, 403)
372
        r = self.get(url, user='διογένης')
373
        self.assertEqual(r.status_code, 403)
374

  
375
        # share for read
376
        url = join_urls(self.pithos_path, self.user, 'φάκελος', 'ο1')
377
        r = self.post(url, content_type='',
378
                      HTTP_X_OBJECT_SHARING='read=%s:γκρουπ' % self.user)
379
        self.assertEqual(r.status_code, 202)
380

  
381
        r = self.head(url, user='διογένης')
382
        self.assertEqual(r.status_code, 200)
383
        r = self.get(url, user='διογένης')
384
        self.assertEqual(r.status_code, 200)
385

  
386
        # check write access
387
        appended_data = get_random_data()
388
        r = self.post(url, user='διογένης',  data=appended_data,
389
                      content_type='application/octet-stream',
390
                      HTTP_CONTENT_LENGTH=str(len(appended_data)),
391
                      HTTP_CONTENT_RANGE='bytes */*')
392
        self.assertEqual(r.status_code, 403)
393

  
394
        # share for write
395
        url = join_urls(self.pithos_path, self.user, 'φάκελος', 'ο1')
396
        r = self.post(url, content_type='',
397
                      HTTP_X_OBJECT_SHARING='write=%s:γκρουπ' % self.user)
398
        self.assertEqual(r.status_code, 202)
399

  
400
        r = self.post(url, user='διογένης', data=appended_data,
401
                      content_type='application/octet-stream',
402
                      HTTP_CONTENT_LENGTH=str(len(appended_data)),
403
                      HTTP_CONTENT_RANGE='bytes */*')
404
        self.assertEqual(r.status_code, 204)
405

  
406
        r = self.get(url, user='διογένης')
407
        self.assertEqual(r.status_code, 200)
408
        self.assertEqual(r.content, odata + appended_data)
409

  
410
    def test_manifestation(self):
411
        self.create_container('κουβάς')
412
        prefix = 'μέρη/'
413
        data = ''
414
        for i in range(5):
415
            part = '%s%d' % (prefix, i)
416
            data += self.upload_object('κουβάς', part)[1]
417

  
418
        self.create_container('φάκελος')
419
        url = join_urls(self.pithos_path, self.user, 'φάκελος', 'άπαντα')
420
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST='κουβάς/%s' % prefix)
421
        self.assertEqual(r.status_code, 201)
422

  
423
        r = self.head(url)
424
        self.assertEqual(r.status_code, 200)
425

  
426
        r = self.get(url)
427
        self.assertEqual(r.status_code, 200)
428
        self.assertEqual(r.content, data)
429

  
430
        # wrong manifestation
431
        url = join_urls(self.pithos_path, self.user, 'φάκελος', 'άπαντα')
432
        r = self.put(url, data='', HTTP_X_OBJECT_MANIFEST='κουβάς/λάθος')
433
        self.assertEqual(r.status_code, 201)
434

  
435
        r = self.get(url)
436
        self.assertEqual(r.status_code, 200)
437
        self.assertTrue(r.content != data)
438

  
439
    def test_update_from_another_object(self):
440
        self.create_container('κουβάς')
441
        initial_data = self.upload_object('κουβάς', 'νέο')[1]
442
        length = TEST_BLOCK_SIZE + random.randint(1, TEST_BLOCK_SIZE - 1)
443
        src_data = self.upload_object('κουβάς', 'πηγή', length=length)[1]
444

  
445
        url = join_urls(self.pithos_path, self.user, 'κουβάς', 'νέο')
446
        r = self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
447
                      HTTP_X_SOURCE_OBJECT='/κουβάς/πηγή')
448
        self.assertEqual(r.status_code, 204)
449

  
450
        r = self.get(url)
451
        self.assertEqual(r.status_code, 200)
452
        self.assertEqual(r.content, initial_data + src_data)
b/snf-pithos-app/pithos/api/tests.py
38 38
from pithos.api.test.permissions import *
39 39
from pithos.api.test.public import *
40 40
from pithos.api.test.views import *
41
from pithos.api.test.unicode import *
41 42
from pithos.api.test.listing import *

Also available in: Unified diff