Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / containers.py @ 2560c061

History | View | Annotate | Download (39 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names,
38
                             pithos_settings, pithos_test_settings)
39
from pithos.api.test.util import strnextling, get_random_data, get_random_name
40

    
41
from synnefo.lib import join_urls
42

    
43
import django.utils.simplejson as json
44
from django.http import urlencode
45

    
46
from xml.dom import minidom
47
from urllib import quote
48
import time as _time
49

    
50
import random
51
import datetime
52

    
53

    
54
class ContainerHead(PithosAPITest):
55
    def test_get_meta(self):
56
        self.create_container('apples')
57

    
58
        # populate with objects
59
        objects = {}
60
        for i in range(random.randint(1, 100)):
61

    
62
            # upload object
63
            meta = {'foo%s' % i: 'bar'}
64
            name, data, resp = self.upload_object('apples', **meta)
65
            objects[name] = data
66

    
67
        t1 = datetime.datetime.utcnow()
68
        url = join_urls(self.pithos_path, self.user, 'apples')
69
        r = self.head(url)
70
        self.assertEqual(int(r['X-Container-Object-Count']), len(objects))
71
        self.assertEqual(int(r['X-Container-Bytes-Used']),
72
                         sum([len(i) for i in objects.values()]))
73
        self.assertTrue('X-Container-Block-Size' in r)
74
        self.assertTrue('X-Container-Block-Hash' in r)
75
        self.assertTrue('X-Container-Until-Timestamp' not in r)
76
        self.assertEqual(r['X-Container-Policy-Versioning'], 'auto')
77
        self.assertEqual(int(r['X-Container-Policy-Quota']), 0)
78
        t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
79
        delta = (t2 - t1)
80
        threashold = datetime.timedelta(seconds=1)
81
        self.assertTrue(delta < threashold)
82
        self.assertTrue(r['X-Container-Object-Meta'])
83
        (self.assertTrue('foo%s' % i in r['X-Container-Object-Meta'])
84
            for i in range(len(objects)))
85

    
86
    def test_get_container_meta_until(self):
87
        self.create_container('apples')
88

    
89
        # populate with objects
90
        objects = {}
91
        metalist = []
92
        for i in range(random.randint(1, 100)):
93
            # upload object
94
            metakey = 'Foo%s' % i
95
            meta = {metakey: 'bar'}
96
            name, data, resp = self.upload_object('apples', **meta)
97
            objects[name] = data
98
            metalist.append(metakey)
99

    
100
        self.update_container_meta('apples', {'foo': 'bar'})
101

    
102
        container_info = self.get_container_info('apples')
103
        t = datetime.datetime.strptime(container_info['Last-Modified'],
104
                                       DATE_FORMATS[2])
105
        t1 = t + datetime.timedelta(seconds=1)
106
        until = int(_time.mktime(t1.timetuple()))
107

    
108
        _time.sleep(2)
109

    
110
        for i in range(random.randint(1, 100)):
111
            # upload object
112
            meta = {'foo%s' % i: 'bar'}
113
            self.upload_object('apples', **meta)
114

    
115
        self.update_container_meta('apples', {'quality': 'AAA'})
116

    
117
        container_info = self.get_container_info('apples')
118
        self.assertTrue('X-Container-Meta-Quality' in container_info)
119
        self.assertTrue('X-Container-Meta-Foo' in container_info)
120
        self.assertTrue('X-Container-Object-Count' in container_info)
121
        self.assertTrue(int(container_info['X-Container-Object-Count']) >
122
                        len(objects))
123
        self.assertTrue('X-Container-Bytes-Used' in container_info)
124

    
125
        t = datetime.datetime.strptime(container_info['Last-Modified'],
126
                                       DATE_FORMATS[-1])
127
        last_modified = int(_time.mktime(t.timetuple()))
128
        assert until < last_modified
129

    
130
        container_info = self.get_container_info('apples', until=until)
131
        self.assertTrue('X-Container-Meta-Quality' not in container_info)
132
        self.assertTrue('X-Container-Meta-Foo' in container_info)
133
        self.assertTrue('X-Container-Until-Timestamp' in container_info)
134
        t = datetime.datetime.strptime(
135
            container_info['X-Container-Until-Timestamp'], DATE_FORMATS[2])
136
        self.assertTrue(int(_time.mktime(t1.timetuple())) <= until)
137
        self.assertTrue('X-Container-Object-Count' in container_info)
138
        self.assertEqual(int(container_info['X-Container-Object-Count']),
139
                         len(objects))
140
        self.assertTrue('X-Container-Bytes-Used' in container_info)
141
        self.assertEqual(int(container_info['X-Container-Bytes-Used']),
142
                         sum([len(data) for data in objects.values()]))
143
        self.assertTrue('X-Container-Object-Meta' in container_info)
144
        self.assertEqual(
145
            sorted(container_info['X-Container-Object-Meta'].split(',')),
146
            sorted(metalist))
147

    
148

    
149
class ContainerGet(PithosAPITest):
150
    def setUp(self):
151
        PithosAPITest.setUp(self)
152

    
153
        self.cnames = ['pears', 'apples']
154
        self.objects = {}
155
        for c in self.cnames:
156
            self.create_container(c)
157

    
158
        self.objects['pears'] = {}
159
        for o in o_names[:8]:
160
            name, data, resp = self.upload_object('pears', o)
161
            self.objects['pears'][name] = data
162
        self.objects['apples'] = {}
163
        for o in o_names[8:]:
164
            name, data, resp = self.upload_object('apples', o)
165
            self.objects['apples'][name] = data
166

    
167
    def test_list_until(self):
168
        account_info = self.get_account_info()
169
        t = datetime.datetime.strptime(account_info['Last-Modified'],
170
                                       DATE_FORMATS[2])
171
        t1 = t + datetime.timedelta(seconds=1)
172
        until = int(_time.mktime(t1.timetuple()))
173

    
174
        _time.sleep(2)
175

    
176
        cname = self.cnames[0]
177
        self.upload_object(cname)
178

    
179
        url = join_urls(self.pithos_path, self.user, cname)
180
        r = self.get('%s?until=%s' % (url, until))
181
        self.assertTrue(r.status_code, 200)
182
        objects = r.content.split('\n')
183
        if '' in objects:
184
            objects.remove('')
185
        self.assertEqual(objects,
186
                         sorted(self.objects[cname].keys()))
187

    
188
        r = self.get('%s?until=%s&format=json' % (url, until))
189
        self.assertTrue(r.status_code, 200)
190
        try:
191
            objects = json.loads(r.content)
192
        except:
193
            self.fail('json format expected')
194
        self.assertEqual([o['name'] for o in objects],
195
                         sorted(self.objects[cname].keys()))
196

    
197
    def test_list_shared(self):
198
        # share an object
199
        cname = self.cnames[0]
200
        onames = self.objects[cname].keys()
201
        shared1 = onames.pop()
202
        url = join_urls(self.pithos_path, self.user, cname, shared1)
203
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=*')
204
        self.assertEqual(r.status_code, 202)
205

    
206
        # share another object
207
        shared2 = onames.pop()
208
        url = join_urls(self.pithos_path, self.user, cname, shared2)
209
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=*')
210
        self.assertEqual(r.status_code, 202)
211

    
212
        # publish another object
213
        public1 = onames.pop()
214
        url = join_urls(self.pithos_path, self.user, cname, public1)
215
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
216
        self.assertEqual(r.status_code, 202)
217

    
218
        # publish another object
219
        public2 = onames.pop()
220
        url = join_urls(self.pithos_path, self.user, cname, public2)
221
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
222
        self.assertEqual(r.status_code, 202)
223

    
224
        # list shared and assert only the shared object is returned
225
        url = join_urls(self.pithos_path, self.user, cname)
226
        r = self.get('%s?shared=' % url)
227
        self.assertEqual(r.status_code, 200)
228
        objects = r.content.split('\n')
229
        if '' in objects:
230
            objects.remove('')
231
        self.assertEqual(sorted([shared1, shared2]), objects)
232

    
233
        # list detailed shared and assert only the shared object is returned
234
        url = join_urls(self.pithos_path, self.user, cname)
235
        r = self.get('%s?shared=&format=json' % url)
236
        self.assertEqual(r.status_code, 200)
237
        try:
238
            objects = json.loads(r.content)
239
        except:
240
            self.fail('json format expected')
241
        l = sorted([shared1, shared2])
242
        i = 0
243
        for name in l:
244
            self.assertEqual(objects[i]['name'], name)
245
            self.assertEqual(objects[i]['bytes'],
246
                             len(self.objects[cname][name]))
247
            self.assertTrue('x_object_sharing' in objects[i])
248
            self.assertTrue('x_object_public' not in objects[i])
249
            i += 1
250

    
251
        # publish the shared object and assert it is still listed in the
252
        # shared objects
253
        url = join_urls(self.pithos_path, self.user, cname, shared1)
254
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
255
        self.assertEqual(r.status_code, 202)
256
        url = join_urls(self.pithos_path, self.user, cname)
257
        r = self.get('%s?shared=&format=json' % url)
258
        self.assertEqual(r.status_code, 200)
259
        try:
260
            objects = json.loads(r.content)
261
        except:
262
            self.fail('json format expected')
263
        i = 0
264
        for name in l:
265
            self.assertEqual(objects[i]['name'], name)
266
            self.assertEqual(objects[i]['bytes'],
267
                             len(self.objects[cname][name]))
268
            self.assertTrue('x_object_sharing' in objects[i])
269
            if name == shared1:
270
                self.assertTrue('x_object_public' in objects[i])
271
            else:
272
                self.assertTrue('x_object_public' not in objects[i])
273
            i += 1
274

    
275
        # create child object
276
        descendant = strnextling(shared1)
277
        self.upload_object(cname, descendant)
278
        # request shared and assert child obejct is not listed
279
        url = join_urls(self.pithos_path, self.user, cname)
280
        r = self.get('%s?shared=' % url)
281
        self.assertEqual(r.status_code, 200)
282
        objects = r.content.split('\n')
283
        if '' in objects:
284
            objects.remove('')
285
        self.assertTrue(shared1 in objects)
286
        self.assertTrue(descendant not in objects)
287

    
288
        # check folder inheritance
289
        folder, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
290
        # create child object
291
        descendant = '%s/%s' % (folder, get_random_name())
292
        self.upload_object(cname, descendant)
293
        # request shared
294
        url = join_urls(self.pithos_path, self.user, cname)
295
        r = self.get('%s?shared=' % url)
296
        self.assertEqual(r.status_code, 200)
297
        objects = r.content.split('\n')
298
        if '' in objects:
299
            objects.remove('')
300
        self.assertTrue(folder in objects)
301
        self.assertTrue(descendant in objects)
302

    
303
    def test_list_public(self):
304
        cname = self.cnames[0]
305
        onames = self.objects[cname].keys()
306

    
307
        # publish an object
308
        public1 = onames.pop()
309
        url = join_urls(self.pithos_path, self.user, cname, public1)
310
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
311
        self.assertEqual(r.status_code, 202)
312

    
313
        # publish another
314
        public2 = onames.pop()
315
        url = join_urls(self.pithos_path, self.user, cname, public2)
316
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
317
        self.assertEqual(r.status_code, 202)
318

    
319
        # share an object
320
        shared1 = onames.pop()
321
        url = join_urls(self.pithos_path, self.user, cname, shared1)
322
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
323
        self.assertEqual(r.status_code, 202)
324

    
325
        # share another
326
        shared2 = onames.pop()
327
        url = join_urls(self.pithos_path, self.user, cname, shared2)
328
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
329
        self.assertEqual(r.status_code, 202)
330

    
331
        # list public and assert only the public object is returned
332
        url = join_urls(self.pithos_path, self.user, cname)
333
        r = self.get('%s?public=' % url)
334
        self.assertEqual(r.status_code, 200)
335
        objects = r.content.split('\n')
336
        if '' in objects:
337
            objects.remove('')
338
        self.assertEqual(sorted([public1, public2]), objects)
339

    
340
        # list detailed public and assert only the public object is returned
341
        url = join_urls(self.pithos_path, self.user, cname)
342
        r = self.get('%s?public=&format=json' % url)
343
        self.assertEqual(r.status_code, 200)
344
        try:
345
            objects = json.loads(r.content)
346
        except:
347
            self.fail('json format expected')
348
        l = sorted([public1, public2])
349
        i = 0
350
        for name in l:
351
            self.assertEqual(objects[i]['name'], name)
352
            self.assertEqual(objects[i]['bytes'],
353
                             len(self.objects[cname][name]))
354
            self.assertTrue('x_object_sharing' not in objects[i])
355
            self.assertTrue('x_object_public' in objects[i])
356
            i += 1
357

    
358
        # share the public object and assert it is still listed in the
359
        # public objects
360
        url = join_urls(self.pithos_path, self.user, cname, public1)
361
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
362
        self.assertEqual(r.status_code, 202)
363
        url = join_urls(self.pithos_path, self.user, cname)
364
        r = self.get('%s?public=&format=json' % url)
365
        self.assertEqual(r.status_code, 200)
366
        try:
367
            objects = json.loads(r.content)
368
        except:
369
            self.fail('json format expected')
370
        i = 0
371
        for name in l:
372
            self.assertEqual(objects[i]['name'], name)
373
            self.assertEqual(objects[i]['bytes'],
374
                             len(self.objects[cname][name]))
375
            if name == public1:
376
                self.assertTrue('x_object_sharing' in objects[i])
377
            else:
378
                self.assertTrue('x_object_sharing' not in objects[i])
379
            i += 1
380

    
381
        url = join_urls(self.pithos_path, self.user, cname)
382

    
383
        # Assert listing the container public contents is forbidden to not
384
        # shared users
385
        r = self.get('%s?public=&format=json' % url, user='bob')
386
        self.assertEqual(r.status_code, 403)
387

    
388
        # Assert forbidden public object listing to shared users
389
        r = self.get('%s?public=&format=json' % url, user='alice')
390
        self.assertEqual(r.status_code, 403)
391

    
392
        # create child object
393
        descendant = strnextling(public1)
394
        self.upload_object(cname, descendant)
395
        # request public and assert child obejct is not listed
396
        r = self.get('%s?public=' % url)
397
        objects = r.content.split('\n')
398
        if '' in objects:
399
            objects.remove('')
400
        self.assertEqual(r.status_code, 200)
401
        self.assertTrue(public1 in objects)
402
        self.assertTrue(descendant not in objects)
403

    
404
        # test folder inheritance
405
        folder, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
406
        # create child object
407
        descendant = '%s/%s' % (folder, get_random_name())
408
        self.upload_object(cname, descendant)
409
        # request public
410
        r = self.get('%s?public=' % url)
411
        self.assertEqual(r.status_code, 200)
412
        objects = r.content.split('\n')
413
        self.assertTrue(folder in objects)
414
        self.assertTrue(descendant not in objects)
415

    
416
    def test_list_shared_public(self):
417
        cname = self.cnames[0]
418
        container_url = join_urls(self.pithos_path, self.user, cname)
419
        onames = self.objects[cname].keys()
420

    
421
        r = self.get('%s?shared=&public=&format=json' % container_url)
422
        self.assertEqual(r.status_code, 200)
423
        objects = json.loads(r.content)
424
        self.assertEqual(len(objects), 0)
425

    
426
        # publish an object
427
        public1 = onames.pop()
428
        url = join_urls(container_url, public1)
429
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
430
        self.assertEqual(r.status_code, 202)
431

    
432
        r = self.get('%s?shared=&public=&format=json' % container_url)
433
        self.assertEqual(r.status_code, 200)
434
        objects = json.loads(r.content)
435
        self.assertEqual(len(objects), 1)
436
        self.assertEqual(objects[0]['name'], public1)
437
        self.assertEqual(objects[0]['bytes'],
438
                         len(self.objects[cname][public1]))
439
        self.assertTrue('x_object_public' in objects[0])
440

    
441
        # publish another
442
        public2 = onames.pop()
443
        url = join_urls(container_url, public2)
444
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
445
        self.assertEqual(r.status_code, 202)
446

    
447
        # share an object
448
        shared1 = onames.pop()
449
        url = join_urls(container_url, shared1)
450
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
451
        self.assertEqual(r.status_code, 202)
452

    
453
        # share another
454
        shared2 = onames.pop()
455
        url = join_urls(container_url, shared2)
456
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
457
        self.assertEqual(r.status_code, 202)
458

    
459
        # list shared and public objects and assert object is listed
460
        r = self.get('%s?shared=&public=&format=json' % container_url)
461
        self.assertEqual(r.status_code, 200)
462
        objects = json.loads(r.content)
463
        l = sorted([public1, public2, shared1, shared2])
464
        i = 0
465
        for name in l:
466
            self.assertEqual(objects[i]['name'], name)
467
            self.assertEqual(objects[i]['bytes'],
468
                             len(self.objects[cname][name]))
469
            self.assertTrue('x_object_sharing' in objects[i] or
470
                            'x_object_public' in objects[i])
471
            i += 1
472

    
473
        # assert not listing shared and public to a not shared user
474
        r = self.get('%s?shared=&public=&format=json' % container_url,
475
                     user='bob')
476
        self.assertEqual(r.status_code, 403)
477

    
478
        # assert not listing public to a shared user
479
        r = self.get('%s?shared=&public=&format=json' % container_url,
480
                     user='alice')
481
        self.assertEqual(r.status_code, 403)
482

    
483
        # create child object
484
        descendant = strnextling(public1)
485
        self.upload_object(cname, descendant)
486
        # request public and assert child object is not listed
487
        r = self.get('%s?shared=&public=' % container_url)
488
        objects = r.content.split('\n')
489
        if '' in objects:
490
            objects.remove('')
491
        self.assertEqual(r.status_code, 200)
492
        self.assertEqual(objects, l)
493

    
494
        # test folder inheritance
495
        folder, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
496
        # create child object
497
        descendant = '%s/%s' % (folder, get_random_name())
498
        self.upload_object(cname, descendant)
499
        # request public
500
        r = self.get('%s?shared=&public=' % container_url)
501
        self.assertEqual(r.status_code, 200)
502
        objects = r.content.split('\n')
503
        if '' in objects:
504
            objects.remove('')
505
        self.assertTrue(folder in objects)
506
        self.assertTrue(descendant not in objects)
507

    
508
        # unpublish public1
509
        url = join_urls(container_url, public1)
510
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false')
511
        self.assertEqual(r.status_code, 202)
512

    
513
        # unpublish public2
514
        url = join_urls(container_url, public2)
515
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false')
516
        self.assertEqual(r.status_code, 202)
517

    
518
        # unpublish folder
519
        url = join_urls(container_url, folder)
520
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='false')
521
        self.assertEqual(r.status_code, 202)
522

    
523
        r = self.get('%s?shared=&public=' % container_url)
524
        self.assertEqual(r.status_code, 200)
525
        objects = r.content.split('\n')
526
        if '' in objects:
527
            objects.remove('')
528
        l = sorted([shared1, shared2])
529
        self.assertEqual(objects, l)
530

    
531
    def test_list_objects(self):
532
        cname = self.cnames[0]
533
        url = join_urls(self.pithos_path, self.user, cname)
534
        r = self.get(url)
535
        self.assertTrue(r.status_code, 200)
536
        objects = r.content.split('\n')
537
        if '' in objects:
538
            objects.remove('')
539
        self.assertEqual(objects, sorted(self.objects[cname].keys()))
540

    
541
    def test_list_objects_containing_slash(self):
542
        self.create_container('test')
543
        self.upload_object('test', quote('/objectname', ''))
544

    
545
        url = join_urls(self.pithos_path, self.user, 'test')
546

    
547
        r = self.get(url)
548
        objects = r.content.split('\n')
549
        if '' in objects:
550
            objects.remove('')
551
        self.assertEqual(objects, ['/objectname'])
552

    
553
        r = self.get('%s?format=json' % url)
554
        try:
555
            objects = json.loads(r.content)
556
        except:
557
            self.fail('json format expected')
558
        self.assertEqual([o['name'] for o in objects], ['/objectname'])
559

    
560
        r = self.get('%s?format=xml' % url)
561
        try:
562
            objects = minidom.parseString(r.content)
563
        except:
564
            self.fail('xml format expected')
565
        self.assertEqual(
566
            [n.firstChild.data for n in objects.getElementsByTagName('name')],
567
            ['/objectname'])
568

    
569
    def test_list_objects_with_limit_marker(self):
570
        cname = self.cnames[0]
571
        url = join_urls(self.pithos_path, self.user, cname)
572
        r = self.get('%s?limit=qwert' % url)
573
        self.assertTrue(r.status_code != 500)
574

    
575
        r = self.get('%s?limit=2' % url)
576
        self.assertEqual(r.status_code, 200)
577
        objects = r.content.split('\n')
578
        if '' in objects:
579
            objects.remove('')
580

    
581
        onames = sorted(self.objects[cname].keys())
582
        self.assertEqual(objects, onames[:2])
583

    
584
        markers = ['How To Win Friends And Influence People.pdf',
585
                   'moms_birthday.jpg']
586
        limit = 4
587
        for m in markers:
588
            r = self.get('%s?limit=%s&marker=%s' % (url, limit, m))
589
            objects = r.content.split('\n')
590
            if '' in objects:
591
                objects.remove('')
592
            start = onames.index(m) + 1
593
            end = start + limit
594
            end = end if len(onames) >= end else len(onames)
595
            self.assertEqual(objects, onames[start:end])
596

    
597
    @pithos_test_settings(API_LIST_LIMIT=10)
598
    def test_list_limit_exceeds(self):
599
        self.create_container('container')
600
        url = join_urls(self.pithos_path, self.user, 'container')
601

    
602
        for _ in range(pithos_settings.API_LIST_LIMIT + 1):
603
            self.upload_object('container')
604

    
605
        r = self.get('%s?format=json' % url)
606
        try:
607
            objects = json.loads(r.content)
608
        except:
609
            self.fail('json format expected')
610
        self.assertEqual(pithos_settings.API_LIST_LIMIT,
611
                         len(objects))
612

    
613
    def test_list_pseudo_hierarchical_folders(self):
614
        url = join_urls(self.pithos_path, self.user, 'apples')
615
        r = self.get('%s?prefix=photos&delimiter=/' % url)
616
        self.assertEqual(r.status_code, 200)
617
        objects = r.content.split('\n')
618
        if '' in objects:
619
            objects.remove('')
620
        self.assertEquals(
621
            ['photos/animals/', 'photos/me.jpg', 'photos/plants/'],
622
            objects)
623

    
624
        r = self.get('%s?prefix=photos/animals&delimiter=/' % url)
625
        objects = r.content.split('\n')
626
        if '' in objects:
627
            objects.remove('')
628
        self.assertEquals(
629
            ['photos/animals/cats/', 'photos/animals/dogs/'], objects)
630

    
631
        r = self.get('%s?path=photos' % url)
632
        objects = r.content.split('\n')
633
        if '' in objects:
634
            objects.remove('')
635
        self.assertEquals(['photos/me.jpg'], objects)
636

    
637
    def test_extended_list_json(self):
638
        url = join_urls(self.pithos_path, self.user, 'apples')
639
        params = {'format': 'json', 'limit': 2, 'prefix': 'photos/animals',
640
                  'delimiter': '/'}
641
        r = self.get('%s?%s' % (url, urlencode(params)))
642
        self.assertEqual(r.status_code, 200)
643
        try:
644
            objects = json.loads(r.content)
645
        except:
646
            self.fail('json format expected')
647
        self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
648
        self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
649

    
650
    def test_extended_list_xml(self):
651
        url = join_urls(self.pithos_path, self.user, 'apples')
652
        params = {'format': 'xml', 'limit': 4, 'prefix': 'photos',
653
                  'delimiter': '/'}
654
        r = self.get('%s?%s' % (url, urlencode(params)))
655
        self.assertEqual(r.status_code, 200)
656
        try:
657
            xml = minidom.parseString(r.content)
658
        except:
659
            self.fail('xml format expected')
660
        self.assert_extended(xml, 'xml', 'object', size=4)
661
        dirs = xml.getElementsByTagName('subdir')
662
        self.assertEqual(len(dirs), 2)
663
        self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
664
        self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
665

    
666
        objects = xml.getElementsByTagName('name')
667
        self.assertEqual(len(objects), 1)
668
        self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
669

    
670
    def test_list_meta_double_matching(self):
671
        # update object meta
672
        cname = 'apples'
673
        container_url = join_urls(self.pithos_path, self.user, cname)
674
        oname = self.objects[cname].keys().pop()
675
        meta = {'quality': 'aaa', 'stock': 'true'}
676
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
677
                       for k, v in meta.iteritems())
678
        object_url = join_urls(container_url, oname)
679
        self.post(object_url, content_type='', **headers)
680

    
681
        # list objects that satisfy the criteria
682
        r = self.get('%s?meta=Quality,Stock' % container_url)
683
        self.assertEqual(r.status_code, 200)
684
        objects = r.content.split('\n')
685
        if '' in objects:
686
            objects.remove('')
687
        self.assertEqual(objects, [oname])
688

    
689
    def test_list_using_meta(self):
690
        # update object meta
691
        cname = 'apples'
692
        container_url = join_urls(self.pithos_path, self.user, cname)
693

    
694
        onames = self.objects[cname].keys()
695
        url = join_urls(container_url, onames[0])
696
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='aaa')
697
        self.assertEqual(r.status_code, 202)
698

    
699
        url = join_urls(container_url, onames[1])
700
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='ab')
701
        self.assertEqual(r.status_code, 202)
702

    
703
        url = join_urls(container_url, onames[2])
704
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='100')
705
        self.assertEqual(r.status_code, 202)
706

    
707
        url = join_urls(container_url, onames[3])
708
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='200')
709
        self.assertEqual(r.status_code, 202)
710

    
711
        # test multiple existence criteria matches
712
        r = self.get('%s?meta=Quality,Stock' % container_url)
713
        self.assertEqual(r.status_code, 200)
714
        objects = r.content.split('\n')
715
        if '' in objects:
716
            objects.remove('')
717
        self.assertTrue(objects, sorted(onames))
718

    
719
        # list objects that satisfy the existence criteria
720
        r = self.get('%s?meta=Stock' % container_url)
721
        self.assertEqual(r.status_code, 200)
722
        objects = r.content.split('\n')
723
        if '' in objects:
724
            objects.remove('')
725
        self.assertTrue(objects, sorted(onames[2:]))
726

    
727
        # test case insensitive existence criteria matching
728
        r = self.get('%s?meta=quality' % container_url)
729
        self.assertEqual(r.status_code, 200)
730
        objects = r.content.split('\n')
731
        if '' in objects:
732
            objects.remove('')
733
        self.assertTrue(objects, sorted(onames[:2]))
734

    
735
        # test do not all existencecriteria match
736
        r = self.get('%s?meta=Quality,Foo' % container_url)
737
        self.assertEqual(r.status_code, 200)
738
        objects = r.content.split('\n')
739
        if '' in objects:
740
            objects.remove('')
741
        self.assertTrue(objects, sorted(onames[:2]))
742

    
743
        # test equals criteria
744
        r = self.get('%s?meta=%s' % (container_url, quote('Quality=aaa')))
745
        self.assertEqual(r.status_code, 200)
746
        objects = r.content.split('\n')
747
        if '' in objects:
748
            objects.remove('')
749
        self.assertTrue(objects, [onames[0]])
750

    
751
        # test not equals criteria
752
        r = self.get('%s?meta=%s' % (container_url, quote('Quality!=aaa')))
753
        self.assertEqual(r.status_code, 200)
754
        objects = r.content.split('\n')
755
        if '' in objects:
756
            objects.remove('')
757
        self.assertTrue(objects, [onames[1]])
758

    
759
        # test lte criteria
760
        r = self.get('%s?meta=%s' % (container_url, quote('Stock<=120')))
761
        self.assertEqual(r.status_code, 200)
762
        objects = r.content.split('\n')
763
        if '' in objects:
764
            objects.remove('')
765
        self.assertTrue(objects, [onames[2]])
766

    
767
        # test gte criteria
768
        r = self.get('%s?meta=%s' % (container_url, quote('Stock>=200')))
769
        self.assertEqual(r.status_code, 200)
770
        objects = r.content.split('\n')
771
        if '' in objects:
772
            objects.remove('')
773
        self.assertTrue(objects, [onames[3]])
774

    
775
    def test_if_modified_since(self):
776
        cname = 'apples'
777
        container_info = self.get_container_info(cname)
778
        last_modified = container_info['Last-Modified']
779
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
780
        t1_formats = map(t1.strftime, DATE_FORMATS)
781

    
782
        # Check not modified
783
        url = join_urls(self.pithos_path, self.user, cname)
784
        for t in t1_formats:
785
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
786
            self.assertEqual(r.status_code, 304)
787

    
788
        # modify account: add container
789
        _time.sleep(1)
790
        oname = self.upload_object(cname)[0]
791

    
792
        # Check modified
793
        objects = self.objects[cname].keys()
794
        objects.append(oname)
795
        for t in t1_formats:
796
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
797
            self.assertEqual(r.status_code, 200)
798
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
799

    
800
        container_info = self.get_container_info(cname)
801
        last_modified = container_info['Last-Modified']
802
        t2 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
803
        t2_formats = map(t2.strftime, DATE_FORMATS)
804

    
805
        # modify account: update account meta
806
        _time.sleep(1)
807
        self.update_container_meta(cname, {'foo': 'bar'})
808

    
809
        # Check modified
810
        for t in t2_formats:
811
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
812
            self.assertEqual(r.status_code, 200)
813
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
814

    
815
    def test_if_modified_since_invalid_date(self):
816
        cname = 'apples'
817
        url = join_urls(self.pithos_path, self.user, cname)
818
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
819
        self.assertEqual(r.status_code, 200)
820
        self.assertEqual(r.content.split('\n')[:-1],
821
                         sorted(self.objects['apples'].keys()))
822

    
823
    def test_if_not_modified_since(self):
824
        cname = 'apples'
825
        url = join_urls(self.pithos_path, self.user, cname)
826
        container_info = self.get_container_info(cname)
827
        last_modified = container_info['Last-Modified']
828
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
829

    
830
        # Check unmodified
831
        t1 = t + datetime.timedelta(seconds=1)
832
        t1_formats = map(t1.strftime, DATE_FORMATS)
833
        for t in t1_formats:
834
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
835
            self.assertEqual(r.status_code, 200)
836
            self.assertEqual(
837
                r.content.split('\n')[:-1],
838
                sorted(self.objects['apples']))
839

    
840
        # modify account: add container
841
        _time.sleep(2)
842
        self.upload_object(cname)
843

    
844
        container_info = self.get_container_info(cname)
845
        last_modified = container_info['Last-Modified']
846
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
847
        t2 = t - datetime.timedelta(seconds=1)
848
        t2_formats = map(t2.strftime, DATE_FORMATS)
849

    
850
        # Check modified
851
        for t in t2_formats:
852
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
853
            self.assertEqual(r.status_code, 412)
854

    
855
        # modify account: update account meta
856
        _time.sleep(1)
857
        self.update_container_meta(cname, {'foo': 'bar'})
858

    
859
        container_info = self.get_container_info(cname)
860
        last_modified = container_info['Last-Modified']
861
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
862
        t3 = t - datetime.timedelta(seconds=1)
863
        t3_formats = map(t3.strftime, DATE_FORMATS)
864

    
865
        # Check modified
866
        for t in t3_formats:
867
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
868
            self.assertEqual(r.status_code, 412)
869

    
870
    def test_if_unmodified_since(self):
871
        cname = 'apples'
872
        url = join_urls(self.pithos_path, self.user, cname)
873
        container_info = self.get_container_info(cname)
874
        last_modified = container_info['Last-Modified']
875
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
876
        t = t + datetime.timedelta(seconds=1)
877
        t_formats = map(t.strftime, DATE_FORMATS)
878

    
879
        for tf in t_formats:
880
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
881
            self.assertEqual(r.status_code, 200)
882
            self.assertEqual(
883
                r.content.split('\n')[:-1],
884
                sorted(self.objects['apples']))
885

    
886
    def test_if_unmodified_since_precondition_failed(self):
887
        cname = 'apples'
888
        url = join_urls(self.pithos_path, self.user, cname)
889
        container_info = self.get_container_info(cname)
890
        last_modified = container_info['Last-Modified']
891
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
892
        t = t - datetime.timedelta(seconds=1)
893
        t_formats = map(t.strftime, DATE_FORMATS)
894

    
895
        for tf in t_formats:
896
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
897
            self.assertEqual(r.status_code, 412)
898

    
899

    
900
class ContainerPut(PithosAPITest):
901
    def test_create(self):
902
        self.create_container('c1')
903
        self.list_containers()
904
        self.assertTrue('c1' in self.list_containers(format=None))
905

    
906
    def test_create_twice(self):
907
        self.create_container('c1')
908
        self.assertTrue('c1' in self.list_containers(format=None))
909
        r = self.create_container('c1')[-1]
910
        self.assertEqual(r.status_code, 202)
911
        self.assertTrue('c1' in self.list_containers(format=None))
912

    
913

    
914
class ContainerPost(PithosAPITest):
915
    def test_update_meta(self):
916
        cname = 'apples'
917
        self.create_container(cname)
918
        meta = {'test': 'test33', 'tost': 'tost22'}
919
        self.update_container_meta(cname, meta)
920
        info = self.get_container_info(cname)
921
        for k, v in meta.items():
922
            k = 'x-container-meta-%s' % k
923
            self.assertTrue(k in info)
924
            self.assertEqual(info[k], v)
925

    
926
    def test_quota(self):
927
        self.create_container('c1')
928

    
929
        url = join_urls(self.pithos_path, self.user, 'c1')
930
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
931
        self.assertEqual(r.status_code, 202)
932

    
933
        info = self.get_container_info('c1')
934
        self.assertTrue('x-container-policy-quota' in info)
935
        self.assertEqual(info['x-container-policy-quota'], '100')
936

    
937
        r = self.upload_object('c1', length=101, verify_status=False)[2]
938
        self.assertEqual(r.status_code, 413)
939

    
940
        url = join_urls(self.pithos_path, self.user, 'c1')
941
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='0')
942
        self.assertEqual(r.status_code, 202)
943

    
944
        r = self.upload_object('c1', length=1)
945

    
946
    def test_upload_blocks(self):
947
        cname = self.create_container()[0]
948

    
949
        url = join_urls(self.pithos_path, self.user, cname)
950
        r = self.post(url, data=get_random_data())
951
        self.assertEqual(r.status_code, 202)
952

    
953
        url = join_urls(self.pithos_path, 'chuck', cname)
954
        r = self.post(url, data=get_random_data())
955
        self.assertEqual(r.status_code, 403)
956

    
957
        # share object for read only
958
        oname = self.upload_object(cname)[0]
959
        url = join_urls(self.pithos_path, self.user, cname, oname)
960
        self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
961
                  HTTP_X_OBJECT_SHARING='read=*')
962
        url = join_urls(self.pithos_path, 'chuck', cname)
963
        r = self.post(url, data=get_random_data())
964
        self.assertEqual(r.status_code, 403)
965

    
966
        # share object for write only
967
        oname = self.upload_object(cname)[0]
968
        url = join_urls(self.pithos_path, self.user, cname, oname)
969
        self.post(url, content_type='', HTTP_CONTENT_RANGE='bytes */*',
970
                  HTTP_X_OBJECT_SHARING='write=*')
971
        url = join_urls(self.pithos_path, 'chuck', cname)
972
        r = self.post(url, data=get_random_data())
973
        self.assertEqual(r.status_code, 403)
974

    
975

    
976
class ContainerDelete(PithosAPITest):
977
    def setUp(self):
978
        PithosAPITest.setUp(self)
979
        cnames = ['c1', 'c2']
980

    
981
        for c in cnames:
982
            self.create_container(c)
983

    
984
    def test_delete(self):
985
        url = join_urls(self.pithos_path, self.user, 'c1')
986
        r = self.delete(url)
987
        self.assertEqual(r.status_code, 204)
988
        self.assertTrue('c1' not in self.list_containers(format=None))
989

    
990
    def test_delete_non_empty(self):
991
        self.upload_object('c1')
992
        url = join_urls(self.pithos_path, self.user, 'c1')
993
        r = self.delete(url)
994
        self.assertEqual(r.status_code, 409)
995
        self.assertTrue('c1' in self.list_containers(format=None))
996

    
997
    def test_delete_invalid(self):
998
        url = join_urls(self.pithos_path, self.user, 'c3')
999
        r = self.delete(url)
1000
        self.assertEqual(r.status_code, 404)
1001

    
1002
    def test_delete_contents(self):
1003
        folder = self.create_folder('c1')[0]
1004
        descendant = strnextling(folder)
1005
        self.upload_object('c1', descendant)
1006
        self.create_folder('c1', '%s/%s' % (folder, get_random_data(5)))[0]
1007

    
1008
        self.delete('%s?delimiter=/' % join_urls(
1009
            self.pithos_path, self.user, 'c1'))
1010
        self.assertEqual([], self.list_objects('c1'))
1011
        self.assertTrue('c1' in self.list_containers(format=None))