Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / containers.py @ 0c6ab9df

History | View | Annotate | Download (36.5 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names,
38
                             pithos_settings, pithos_test_settings)
39
from pithos.api.test.util import strnextling, get_random_data, get_random_name
40

    
41
from synnefo.lib import join_urls
42

    
43
import django.utils.simplejson as json
44
from django.http import urlencode
45

    
46
from xml.dom import minidom
47
from urllib import quote
48
import time as _time
49

    
50
import random
51
import datetime
52

    
53

    
54
class ContainerHead(PithosAPITest):
55
    def test_get_meta(self):
56
        self.create_container('apples')
57

    
58
        # populate with objects
59
        objects = {}
60
        for i in range(random.randint(1, 100)):
61

    
62
            # upload object
63
            meta = {'foo%s' % i: 'bar'}
64
            name, data, resp = self.upload_object('apples', **meta)
65
            objects[name] = data
66

    
67
        t1 = datetime.datetime.utcnow()
68
        url = join_urls(self.pithos_path, self.user, 'apples')
69
        r = self.head(url)
70
        self.assertEqual(int(r['X-Container-Object-Count']), len(objects))
71
        self.assertEqual(int(r['X-Container-Bytes-Used']),
72
                         sum([len(i) for i in objects.values()]))
73
        self.assertTrue('X-Container-Block-Size' in r)
74
        self.assertTrue('X-Container-Block-Hash' in r)
75
        self.assertTrue('X-Container-Until-Timestamp' not in r)
76
        self.assertEqual(r['X-Container-Policy-Versioning'], 'auto')
77
        self.assertEqual(int(r['X-Container-Policy-Quota']), 0)
78
        t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
79
        delta = (t2 - t1)
80
        threashold = datetime.timedelta(seconds=1)
81
        self.assertTrue(delta < threashold)
82
        self.assertTrue(r['X-Container-Object-Meta'])
83
        (self.assertTrue('foo%s' % i in r['X-Container-Object-Meta'])
84
            for i in range(len(objects)))
85

    
86
    def test_get_container_meta_until(self):
87
        self.create_container('apples')
88

    
89
        # populate with objects
90
        objects = {}
91
        metalist = []
92
        for i in range(random.randint(1, 100)):
93
            # upload object
94
            metakey = 'Foo%s' % i
95
            meta = {metakey: 'bar'}
96
            name, data, resp = self.upload_object('apples', **meta)
97
            objects[name] = data
98
            metalist.append(metakey)
99

    
100
        self.update_container_meta('apples', {'foo': 'bar'})
101

    
102
        container_info = self.get_container_info('apples')
103
        t = datetime.datetime.strptime(container_info['Last-Modified'],
104
                                       DATE_FORMATS[2])
105
        t1 = t + datetime.timedelta(seconds=1)
106
        until = int(_time.mktime(t1.timetuple()))
107

    
108
        _time.sleep(2)
109

    
110
        for i in range(random.randint(1, 100)):
111
            # upload object
112
            meta = {'foo%s' % i: 'bar'}
113
            self.upload_object('apples', **meta)
114

    
115
        self.update_container_meta('apples', {'quality': 'AAA'})
116

    
117
        container_info = self.get_container_info('apples')
118
        self.assertTrue('X-Container-Meta-Quality' in container_info)
119
        self.assertTrue('X-Container-Meta-Foo' in container_info)
120
        self.assertTrue('X-Container-Object-Count' in container_info)
121
        self.assertTrue(int(container_info['X-Container-Object-Count']) >
122
                        len(objects))
123
        self.assertTrue('X-Container-Bytes-Used' in container_info)
124

    
125
        t = datetime.datetime.strptime(container_info['Last-Modified'],
126
                                       DATE_FORMATS[-1])
127
        last_modified = int(_time.mktime(t.timetuple()))
128
        assert until < last_modified
129

    
130
        container_info = self.get_container_info('apples', until=until)
131
        self.assertTrue('X-Container-Meta-Quality' not in container_info)
132
        self.assertTrue('X-Container-Meta-Foo' in container_info)
133
        self.assertTrue('X-Container-Until-Timestamp' in container_info)
134
        t = datetime.datetime.strptime(
135
            container_info['X-Container-Until-Timestamp'], DATE_FORMATS[2])
136
        self.assertTrue(int(_time.mktime(t1.timetuple())) <= until)
137
        self.assertTrue('X-Container-Object-Count' in container_info)
138
        self.assertEqual(int(container_info['X-Container-Object-Count']),
139
                         len(objects))
140
        self.assertTrue('X-Container-Bytes-Used' in container_info)
141
        self.assertEqual(int(container_info['X-Container-Bytes-Used']),
142
                         sum([len(data) for data in objects.values()]))
143
        self.assertTrue('X-Container-Object-Meta' in container_info)
144
        self.assertEqual(
145
            sorted(container_info['X-Container-Object-Meta'].split(',')),
146
            sorted(metalist))
147

    
148

    
149
class ContainerGet(PithosAPITest):
150
    def setUp(self):
151
        PithosAPITest.setUp(self)
152

    
153
        self.cnames = ['pears', 'apples']
154
        self.objects = {}
155
        for c in self.cnames:
156
            self.create_container(c)
157

    
158
        self.objects['pears'] = {}
159
        for o in o_names[:8]:
160
            name, data, resp = self.upload_object('pears', o)
161
            self.objects['pears'][name] = data
162
        self.objects['apples'] = {}
163
        for o in o_names[8:]:
164
            name, data, resp = self.upload_object('apples', o)
165
            self.objects['apples'][name] = data
166

    
167
    def test_list_until(self):
168
        account_info = self.get_account_info()
169
        t = datetime.datetime.strptime(account_info['Last-Modified'],
170
                                       DATE_FORMATS[2])
171
        t1 = t + datetime.timedelta(seconds=1)
172
        until = int(_time.mktime(t1.timetuple()))
173

    
174
        _time.sleep(2)
175

    
176
        cname = self.cnames[0]
177
        self.upload_object(cname)
178

    
179
        url = join_urls(self.pithos_path, self.user, cname)
180
        r = self.get('%s?until=%s' % (url, until))
181
        self.assertTrue(r.status_code, 200)
182
        objects = r.content.split('\n')
183
        if '' in objects:
184
            objects.remove('')
185
        self.assertEqual(objects,
186
                         sorted(self.objects[cname].keys()))
187

    
188
        r = self.get('%s?until=%s&format=json' % (url, until))
189
        self.assertTrue(r.status_code, 200)
190
        try:
191
            objects = json.loads(r.content)
192
        except:
193
            self.fail('json format expected')
194
        self.assertEqual([o['name'] for o in objects],
195
                         sorted(self.objects[cname].keys()))
196

    
197
    def test_list_shared(self):
198
        # share an object
199
        cname = self.cnames[0]
200
        onames = self.objects[cname].keys()
201
        shared1 = onames.pop()
202
        url = join_urls(self.pithos_path, self.user, cname, shared1)
203
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=*')
204
        self.assertEqual(r.status_code, 202)
205

    
206
        # share another object
207
        shared2 = onames.pop()
208
        url = join_urls(self.pithos_path, self.user, cname, shared2)
209
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=*')
210
        self.assertEqual(r.status_code, 202)
211

    
212
        # publish another object
213
        public1 = onames.pop()
214
        url = join_urls(self.pithos_path, self.user, cname, public1)
215
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
216
        self.assertEqual(r.status_code, 202)
217

    
218
        # publish another object
219
        public2 = onames.pop()
220
        url = join_urls(self.pithos_path, self.user, cname, public2)
221
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
222
        self.assertEqual(r.status_code, 202)
223

    
224
        # list shared and assert only the shared object is returned
225
        url = join_urls(self.pithos_path, self.user, cname)
226
        r = self.get('%s?shared=' % url)
227
        self.assertEqual(r.status_code, 200)
228
        objects = r.content.split('\n')
229
        if '' in objects:
230
            objects.remove('')
231
        self.assertEqual(sorted([shared1, shared2]), objects)
232

    
233
        # list detailed shared and assert only the shared object is returned
234
        url = join_urls(self.pithos_path, self.user, cname)
235
        r = self.get('%s?shared=&format=json' % url)
236
        self.assertEqual(r.status_code, 200)
237
        try:
238
            objects = json.loads(r.content)
239
        except:
240
            self.fail('json format expected')
241
        l = sorted([shared1, shared2])
242
        i = 0
243
        for name in l:
244
            self.assertEqual(objects[i]['name'], name)
245
            self.assertEqual(objects[i]['bytes'],
246
                             len(self.objects[cname][name]))
247
            self.assertTrue('x_object_sharing' in objects[i])
248
            self.assertTrue('x_object_public' not in objects[i])
249
            i += 1
250

    
251
        # publish the shared object and assert it is still listed in the
252
        # shared objects
253
        url = join_urls(self.pithos_path, self.user, cname, shared1)
254
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
255
        self.assertEqual(r.status_code, 202)
256
        url = join_urls(self.pithos_path, self.user, cname)
257
        r = self.get('%s?shared=&format=json' % url)
258
        self.assertEqual(r.status_code, 200)
259
        try:
260
            objects = json.loads(r.content)
261
        except:
262
            self.fail('json format expected')
263
        i = 0
264
        for name in l:
265
            self.assertEqual(objects[i]['name'], name)
266
            self.assertEqual(objects[i]['bytes'],
267
                             len(self.objects[cname][name]))
268
            self.assertTrue('x_object_sharing' in objects[i])
269
            if name == shared1:
270
                self.assertTrue('x_object_public' in objects[i])
271
            else:
272
                self.assertTrue('x_object_public' not in objects[i])
273
            i += 1
274

    
275
        # create child object
276
        descendant = strnextling(shared1)
277
        self.upload_object(cname, descendant)
278
        # request shared and assert child obejct is not listed
279
        url = join_urls(self.pithos_path, self.user, cname)
280
        r = self.get('%s?shared=' % url)
281
        self.assertEqual(r.status_code, 200)
282
        objects = r.content.split('\n')
283
        if '' in objects:
284
            objects.remove('')
285
        self.assertTrue(shared1 in objects)
286
        self.assertTrue(descendant not in objects)
287

    
288
        # check folder inheritance
289
        folder, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
290
        # create child object
291
        descendant = '%s/%s' % (folder, get_random_name())
292
        self.upload_object(cname, descendant)
293
        # request shared
294
        url = join_urls(self.pithos_path, self.user, cname)
295
        r = self.get('%s?shared=' % url)
296
        self.assertEqual(r.status_code, 200)
297
        objects = r.content.split('\n')
298
        if '' in objects:
299
            objects.remove('')
300
        self.assertTrue(folder in objects)
301
        self.assertTrue(descendant in objects)
302

    
303
    def test_list_public(self):
304
        cname = self.cnames[0]
305
        onames = self.objects[cname].keys()
306

    
307
        # publish an object
308
        public1 = onames.pop()
309
        url = join_urls(self.pithos_path, self.user, cname, public1)
310
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
311
        self.assertEqual(r.status_code, 202)
312

    
313
        # publish another
314
        public2 = onames.pop()
315
        url = join_urls(self.pithos_path, self.user, cname, public2)
316
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
317
        self.assertEqual(r.status_code, 202)
318

    
319
        # share an object
320
        shared1 = onames.pop()
321
        url = join_urls(self.pithos_path, self.user, cname, shared1)
322
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
323
        self.assertEqual(r.status_code, 202)
324

    
325
        # share another
326
        shared2 = onames.pop()
327
        url = join_urls(self.pithos_path, self.user, cname, shared2)
328
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
329
        self.assertEqual(r.status_code, 202)
330

    
331
        # list public and assert only the public object is returned
332
        url = join_urls(self.pithos_path, self.user, cname)
333
        r = self.get('%s?public=' % url)
334
        self.assertEqual(r.status_code, 200)
335
        objects = r.content.split('\n')
336
        if '' in objects:
337
            objects.remove('')
338
        self.assertEqual(sorted([public1, public2]), objects)
339

    
340
        # list detailed public and assert only the public object is returned
341
        url = join_urls(self.pithos_path, self.user, cname)
342
        r = self.get('%s?public=&format=json' % url)
343
        self.assertEqual(r.status_code, 200)
344
        try:
345
            objects = json.loads(r.content)
346
        except:
347
            self.fail('json format expected')
348
        l = sorted([public1, public2])
349
        i = 0
350
        for name in l:
351
            self.assertEqual(objects[i]['name'], name)
352
            self.assertEqual(objects[i]['bytes'],
353
                             len(self.objects[cname][name]))
354
            self.assertTrue('x_object_sharing' not in objects[i])
355
            self.assertTrue('x_object_public' in objects[i])
356
            i += 1
357

    
358
        # share the public object and assert it is still listed in the
359
        # public objects
360
        url = join_urls(self.pithos_path, self.user, cname, public1)
361
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
362
        self.assertEqual(r.status_code, 202)
363
        url = join_urls(self.pithos_path, self.user, cname)
364
        r = self.get('%s?public=&format=json' % url)
365
        self.assertEqual(r.status_code, 200)
366
        try:
367
            objects = json.loads(r.content)
368
        except:
369
            self.fail('json format expected')
370
        i = 0
371
        for name in l:
372
            self.assertEqual(objects[i]['name'], name)
373
            self.assertEqual(objects[i]['bytes'],
374
                             len(self.objects[cname][name]))
375
            if name == public1:
376
                self.assertTrue('x_object_sharing' in objects[i])
377
            else:
378
                self.assertTrue('x_object_sharing' not in objects[i])
379
            i += 1
380

    
381
        url = join_urls(self.pithos_path, self.user, cname)
382

    
383
        # Assert listing the container public contents is forbidden to not
384
        # shared users
385
        r = self.get('%s?public=&format=json' % url, user='bob')
386
        self.assertEqual(r.status_code, 403)
387

    
388
        # Assert forbidden public object listing to shared users
389
        r = self.get('%s?public=&format=json' % url, user='alice')
390
        self.assertEqual(r.status_code, 403)
391

    
392
        # create child object
393
        descendant = strnextling(public1)
394
        self.upload_object(cname, descendant)
395
        # request public and assert child obejct is not listed
396
        r = self.get('%s?public=' % url)
397
        objects = r.content.split('\n')
398
        if '' in objects:
399
            objects.remove('')
400
        self.assertEqual(r.status_code, 200)
401
        self.assertTrue(public1 in objects)
402
        self.assertTrue(descendant not in objects)
403

    
404
        # test folder inheritance
405
        folder, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
406
        # create child object
407
        descendant = '%s/%s' % (folder, get_random_name())
408
        self.upload_object(cname, descendant)
409
        # request public
410
        r = self.get('%s?public=' % url)
411
        self.assertEqual(r.status_code, 200)
412
        objects = r.content.split('\n')
413
        self.assertTrue(folder in objects)
414
        self.assertTrue(descendant not in objects)
415

    
416
    def test_list_shared_public(self):
417
        cname = self.cnames[0]
418
        container_url = join_urls(self.pithos_path, self.user, cname)
419
        onames = self.objects[cname].keys()
420

    
421
        # publish an object
422
        public1 = onames.pop()
423
        url = join_urls(container_url, public1)
424
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
425
        self.assertEqual(r.status_code, 202)
426

    
427
        # publish another
428
        public2 = onames.pop()
429
        url = join_urls(container_url, public2)
430
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
431
        self.assertEqual(r.status_code, 202)
432

    
433
        # share an object
434
        shared1 = onames.pop()
435
        url = join_urls(container_url, shared1)
436
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
437
        self.assertEqual(r.status_code, 202)
438

    
439
        # share another
440
        shared2 = onames.pop()
441
        url = join_urls(container_url, shared2)
442
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
443
        self.assertEqual(r.status_code, 202)
444

    
445
        # list shared and public objects and assert object is listed
446
        r = self.get('%s?shared=&public=&format=json' % container_url)
447
        self.assertEqual(r.status_code, 200)
448
        objects = json.loads(r.content)
449
        l = sorted([public1, public2, shared1, shared2])
450
        i = 0
451
        for name in l:
452
            self.assertEqual(objects[i]['name'], name)
453
            self.assertEqual(objects[i]['bytes'],
454
                             len(self.objects[cname][name]))
455
            self.assertTrue('x_object_sharing' in objects[i] or
456
                            'x_object_public' in objects[i])
457
            i += 1
458

    
459
        # assert not listing shared and public to a not shared user
460
        r = self.get('%s?shared=&public=&format=json' % container_url,
461
                     user='bob')
462
        self.assertEqual(r.status_code, 403)
463

    
464
        # assert not listing public to a shared user
465
        r = self.get('%s?shared=&public=&format=json' % container_url,
466
                     user='alice')
467
        self.assertEqual(r.status_code, 403)
468

    
469
        # create child object
470
        descendant = strnextling(public1)
471
        self.upload_object(cname, descendant)
472
        # request public and assert child obejct is not listed
473
        r = self.get('%s?shared=&public=' % container_url)
474
        objects = r.content.split('\n')
475
        if '' in objects:
476
            objects.remove('')
477
        self.assertEqual(r.status_code, 200)
478
        self.assertEqual(objects, l)
479

    
480
        # test folder inheritance
481
        folder, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
482
        # create child object
483
        descendant = '%s/%s' % (folder, get_random_name())
484
        self.upload_object(cname, descendant)
485
        # request public
486
        r = self.get('%s?shared=&public=' % container_url)
487
        self.assertEqual(r.status_code, 200)
488
        objects = r.content.split('\n')
489
        if '' in objects:
490
            objects.remove('')
491
        self.assertTrue(folder in objects)
492
        self.assertTrue(descendant not in objects)
493

    
494
    def test_list_objects(self):
495
        cname = self.cnames[0]
496
        url = join_urls(self.pithos_path, self.user, cname)
497
        r = self.get(url)
498
        self.assertTrue(r.status_code, 200)
499
        objects = r.content.split('\n')
500
        if '' in objects:
501
            objects.remove('')
502
        self.assertEqual(objects, sorted(self.objects[cname].keys()))
503

    
504
    def test_list_objects_containing_slash(self):
505
        self.create_container('test')
506
        self.upload_object('test', quote('/objectname', ''))
507

    
508
        url = join_urls(self.pithos_path, self.user, 'test')
509

    
510
        r = self.get(url)
511
        objects = r.content.split('\n')
512
        if '' in objects:
513
            objects.remove('')
514
        self.assertEqual(objects, ['/objectname'])
515

    
516
        r = self.get('%s?format=json' % url)
517
        try:
518
            objects = json.loads(r.content)
519
        except:
520
            self.fail('json format expected')
521
        self.assertEqual([o['name'] for o in objects], ['/objectname'])
522

    
523
        r = self.get('%s?format=xml' % url)
524
        try:
525
            objects = minidom.parseString(r.content)
526
        except:
527
            self.fail('xml format expected')
528
        self.assertEqual(
529
            [n.firstChild.data for n in objects.getElementsByTagName('name')],
530
            ['/objectname'])
531

    
532
    def test_list_objects_with_limit_marker(self):
533
        cname = self.cnames[0]
534
        url = join_urls(self.pithos_path, self.user, cname)
535
        r = self.get('%s?limit=qwert' % url)
536
        self.assertTrue(r.status_code != 500)
537

    
538
        r = self.get('%s?limit=2' % url)
539
        self.assertEqual(r.status_code, 200)
540
        objects = r.content.split('\n')
541
        if '' in objects:
542
            objects.remove('')
543

    
544
        onames = sorted(self.objects[cname].keys())
545
        self.assertEqual(objects, onames[:2])
546

    
547
        markers = ['How To Win Friends And Influence People.pdf',
548
                   'moms_birthday.jpg']
549
        limit = 4
550
        for m in markers:
551
            r = self.get('%s?limit=%s&marker=%s' % (url, limit, m))
552
            objects = r.content.split('\n')
553
            if '' in objects:
554
                objects.remove('')
555
            start = onames.index(m) + 1
556
            end = start + limit
557
            end = end if len(onames) >= end else len(onames)
558
            self.assertEqual(objects, onames[start:end])
559

    
560
    @pithos_test_settings(API_LIST_LIMIT=10)
561
    def test_list_limit_exceeds(self):
562
        self.create_container('container')
563
        url = join_urls(self.pithos_path, self.user, 'container')
564

    
565
        for _ in range(pithos_settings.API_LIST_LIMIT + 1):
566
            self.upload_object('container')
567

    
568
        r = self.get('%s?format=json' % url)
569
        try:
570
            objects = json.loads(r.content)
571
        except:
572
            self.fail('json format expected')
573
        self.assertEqual(pithos_settings.API_LIST_LIMIT,
574
                         len(objects))
575

    
576
    def test_list_pseudo_hierarchical_folders(self):
577
        url = join_urls(self.pithos_path, self.user, 'apples')
578
        r = self.get('%s?prefix=photos&delimiter=/' % url)
579
        self.assertEqual(r.status_code, 200)
580
        objects = r.content.split('\n')
581
        if '' in objects:
582
            objects.remove('')
583
        self.assertEquals(
584
            ['photos/animals/', 'photos/me.jpg', 'photos/plants/'],
585
            objects)
586

    
587
        r = self.get('%s?prefix=photos/animals&delimiter=/' % url)
588
        objects = r.content.split('\n')
589
        if '' in objects:
590
            objects.remove('')
591
        self.assertEquals(
592
            ['photos/animals/cats/', 'photos/animals/dogs/'], objects)
593

    
594
        r = self.get('%s?path=photos' % url)
595
        objects = r.content.split('\n')
596
        if '' in objects:
597
            objects.remove('')
598
        self.assertEquals(['photos/me.jpg'], objects)
599

    
600
    def test_extended_list_json(self):
601
        url = join_urls(self.pithos_path, self.user, 'apples')
602
        params = {'format': 'json', 'limit': 2, 'prefix': 'photos/animals',
603
                  'delimiter': '/'}
604
        r = self.get('%s?%s' % (url, urlencode(params)))
605
        self.assertEqual(r.status_code, 200)
606
        try:
607
            objects = json.loads(r.content)
608
        except:
609
            self.fail('json format expected')
610
        self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
611
        self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
612

    
613
    def test_extended_list_xml(self):
614
        url = join_urls(self.pithos_path, self.user, 'apples')
615
        params = {'format': 'xml', 'limit': 4, 'prefix': 'photos',
616
                  'delimiter': '/'}
617
        r = self.get('%s?%s' % (url, urlencode(params)))
618
        self.assertEqual(r.status_code, 200)
619
        try:
620
            xml = minidom.parseString(r.content)
621
        except:
622
            self.fail('xml format expected')
623
        self.assert_extended(xml, 'xml', 'object', size=4)
624
        dirs = xml.getElementsByTagName('subdir')
625
        self.assertEqual(len(dirs), 2)
626
        self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
627
        self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
628

    
629
        objects = xml.getElementsByTagName('name')
630
        self.assertEqual(len(objects), 1)
631
        self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
632

    
633
    def test_list_meta_double_matching(self):
634
        # update object meta
635
        cname = 'apples'
636
        container_url = join_urls(self.pithos_path, self.user, cname)
637
        oname = self.objects[cname].keys().pop()
638
        meta = {'quality': 'aaa', 'stock': 'true'}
639
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
640
                       for k, v in meta.iteritems())
641
        object_url = join_urls(container_url, oname)
642
        self.post(object_url, content_type='', **headers)
643

    
644
        # list objects that satisfy the criteria
645
        r = self.get('%s?meta=Quality,Stock' % container_url)
646
        self.assertEqual(r.status_code, 200)
647
        objects = r.content.split('\n')
648
        if '' in objects:
649
            objects.remove('')
650
        self.assertEqual(objects, [oname])
651

    
652
    def test_list_using_meta(self):
653
        # update object meta
654
        cname = 'apples'
655
        container_url = join_urls(self.pithos_path, self.user, cname)
656

    
657
        onames = self.objects[cname].keys()
658
        url = join_urls(container_url, onames[0])
659
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='aaa')
660
        self.assertEqual(r.status_code, 202)
661

    
662
        url = join_urls(container_url, onames[1])
663
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='ab')
664
        self.assertEqual(r.status_code, 202)
665

    
666
        url = join_urls(container_url, onames[2])
667
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='100')
668
        self.assertEqual(r.status_code, 202)
669

    
670
        url = join_urls(container_url, onames[3])
671
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='200')
672
        self.assertEqual(r.status_code, 202)
673

    
674
        # test multiple existence criteria matches
675
        r = self.get('%s?meta=Quality,Stock' % container_url)
676
        self.assertEqual(r.status_code, 200)
677
        objects = r.content.split('\n')
678
        if '' in objects:
679
            objects.remove('')
680
        self.assertTrue(objects, sorted(onames))
681

    
682
        # list objects that satisfy the existence criteria
683
        r = self.get('%s?meta=Stock' % container_url)
684
        self.assertEqual(r.status_code, 200)
685
        objects = r.content.split('\n')
686
        if '' in objects:
687
            objects.remove('')
688
        self.assertTrue(objects, sorted(onames[2:]))
689

    
690
        # test case insensitive existence criteria matching
691
        r = self.get('%s?meta=quality' % container_url)
692
        self.assertEqual(r.status_code, 200)
693
        objects = r.content.split('\n')
694
        if '' in objects:
695
            objects.remove('')
696
        self.assertTrue(objects, sorted(onames[:2]))
697

    
698
        # test do not all existencecriteria match
699
        r = self.get('%s?meta=Quality,Foo' % container_url)
700
        self.assertEqual(r.status_code, 200)
701
        objects = r.content.split('\n')
702
        if '' in objects:
703
            objects.remove('')
704
        self.assertTrue(objects, sorted(onames[:2]))
705

    
706
        # test equals criteria
707
        r = self.get('%s?meta=%s' % (container_url, quote('Quality=aaa')))
708
        self.assertEqual(r.status_code, 200)
709
        objects = r.content.split('\n')
710
        if '' in objects:
711
            objects.remove('')
712
        self.assertTrue(objects, [onames[0]])
713

    
714
        # test not equals criteria
715
        r = self.get('%s?meta=%s' % (container_url, quote('Quality!=aaa')))
716
        self.assertEqual(r.status_code, 200)
717
        objects = r.content.split('\n')
718
        if '' in objects:
719
            objects.remove('')
720
        self.assertTrue(objects, [onames[1]])
721

    
722
        # test lte criteria
723
        r = self.get('%s?meta=%s' % (container_url, quote('Stock<=120')))
724
        self.assertEqual(r.status_code, 200)
725
        objects = r.content.split('\n')
726
        if '' in objects:
727
            objects.remove('')
728
        self.assertTrue(objects, [onames[2]])
729

    
730
        # test gte criteria
731
        r = self.get('%s?meta=%s' % (container_url, quote('Stock>=200')))
732
        self.assertEqual(r.status_code, 200)
733
        objects = r.content.split('\n')
734
        if '' in objects:
735
            objects.remove('')
736
        self.assertTrue(objects, [onames[3]])
737

    
738
    def test_if_modified_since(self):
739
        cname = 'apples'
740
        container_info = self.get_container_info(cname)
741
        last_modified = container_info['Last-Modified']
742
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
743
        t1_formats = map(t1.strftime, DATE_FORMATS)
744

    
745
        # Check not modified
746
        url = join_urls(self.pithos_path, self.user, cname)
747
        for t in t1_formats:
748
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
749
            self.assertEqual(r.status_code, 304)
750

    
751
        # modify account: add container
752
        _time.sleep(1)
753
        oname = self.upload_object(cname)[0]
754

    
755
        # Check modified
756
        objects = self.objects[cname].keys()
757
        objects.append(oname)
758
        for t in t1_formats:
759
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
760
            self.assertEqual(r.status_code, 200)
761
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
762

    
763
        container_info = self.get_container_info(cname)
764
        last_modified = container_info['Last-Modified']
765
        t2 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
766
        t2_formats = map(t2.strftime, DATE_FORMATS)
767

    
768
        # modify account: update account meta
769
        _time.sleep(1)
770
        self.update_container_meta(cname, {'foo': 'bar'})
771

    
772
        # Check modified
773
        for t in t2_formats:
774
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
775
            self.assertEqual(r.status_code, 200)
776
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
777

    
778
    def test_if_modified_since_invalid_date(self):
779
        cname = 'apples'
780
        url = join_urls(self.pithos_path, self.user, cname)
781
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
782
        self.assertEqual(r.status_code, 200)
783
        self.assertEqual(r.content.split('\n')[:-1],
784
                         sorted(self.objects['apples'].keys()))
785

    
786
    def test_if_not_modified_since(self):
787
        cname = 'apples'
788
        url = join_urls(self.pithos_path, self.user, cname)
789
        container_info = self.get_container_info(cname)
790
        last_modified = container_info['Last-Modified']
791
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
792

    
793
        # Check unmodified
794
        t1 = t + datetime.timedelta(seconds=1)
795
        t1_formats = map(t1.strftime, DATE_FORMATS)
796
        for t in t1_formats:
797
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
798
            self.assertEqual(r.status_code, 200)
799
            self.assertEqual(
800
                r.content.split('\n')[:-1],
801
                sorted(self.objects['apples']))
802

    
803
        # modify account: add container
804
        _time.sleep(2)
805
        self.upload_object(cname)
806

    
807
        container_info = self.get_container_info(cname)
808
        last_modified = container_info['Last-Modified']
809
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
810
        t2 = t - datetime.timedelta(seconds=1)
811
        t2_formats = map(t2.strftime, DATE_FORMATS)
812

    
813
        # Check modified
814
        for t in t2_formats:
815
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
816
            self.assertEqual(r.status_code, 412)
817

    
818
        # modify account: update account meta
819
        _time.sleep(1)
820
        self.update_container_meta(cname, {'foo': 'bar'})
821

    
822
        container_info = self.get_container_info(cname)
823
        last_modified = container_info['Last-Modified']
824
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
825
        t3 = t - datetime.timedelta(seconds=1)
826
        t3_formats = map(t3.strftime, DATE_FORMATS)
827

    
828
        # Check modified
829
        for t in t3_formats:
830
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
831
            self.assertEqual(r.status_code, 412)
832

    
833
    def test_if_unmodified_since(self):
834
        cname = 'apples'
835
        url = join_urls(self.pithos_path, self.user, cname)
836
        container_info = self.get_container_info(cname)
837
        last_modified = container_info['Last-Modified']
838
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
839
        t = t + datetime.timedelta(seconds=1)
840
        t_formats = map(t.strftime, DATE_FORMATS)
841

    
842
        for tf in t_formats:
843
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
844
            self.assertEqual(r.status_code, 200)
845
            self.assertEqual(
846
                r.content.split('\n')[:-1],
847
                sorted(self.objects['apples']))
848

    
849
    def test_if_unmodified_since_precondition_failed(self):
850
        cname = 'apples'
851
        url = join_urls(self.pithos_path, self.user, cname)
852
        container_info = self.get_container_info(cname)
853
        last_modified = container_info['Last-Modified']
854
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
855
        t = t - datetime.timedelta(seconds=1)
856
        t_formats = map(t.strftime, DATE_FORMATS)
857

    
858
        for tf in t_formats:
859
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
860
            self.assertEqual(r.status_code, 412)
861

    
862

    
863
class ContainerPut(PithosAPITest):
864
    def test_create(self):
865
        self.create_container('c1')
866
        self.list_containers()
867
        self.assertTrue('c1' in self.list_containers(format=None))
868

    
869
    def test_create_twice(self):
870
        self.create_container('c1')
871
        self.assertTrue('c1' in self.list_containers(format=None))
872
        r = self.create_container('c1')[-1]
873
        self.assertEqual(r.status_code, 202)
874
        self.assertTrue('c1' in self.list_containers(format=None))
875

    
876

    
877
class ContainerPost(PithosAPITest):
878
    def test_update_meta(self):
879
        cname = 'apples'
880
        self.create_container(cname)
881
        meta = {'test': 'test33', 'tost': 'tost22'}
882
        self.update_container_meta(cname, meta)
883
        info = self.get_container_info(cname)
884
        for k, v in meta.items():
885
            k = 'x-container-meta-%s' % k
886
            self.assertTrue(k in info)
887
            self.assertEqual(info[k], v)
888

    
889
    def test_quota(self):
890
        self.create_container('c1')
891

    
892
        url = join_urls(self.pithos_path, self.user, 'c1')
893
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
894
        self.assertEqual(r.status_code, 202)
895

    
896
        info = self.get_container_info('c1')
897
        self.assertTrue('x-container-policy-quota' in info)
898
        self.assertEqual(info['x-container-policy-quota'], '100')
899

    
900
        r = self.upload_object('c1', length=101, verify_status=False)[2]
901
        self.assertEqual(r.status_code, 413)
902

    
903
        url = join_urls(self.pithos_path, self.user, 'c1')
904
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='0')
905
        self.assertEqual(r.status_code, 202)
906

    
907
        r = self.upload_object('c1', length=1)
908

    
909

    
910
class ContainerDelete(PithosAPITest):
911
    def setUp(self):
912
        PithosAPITest.setUp(self)
913
        cnames = ['c1', 'c2']
914

    
915
        for c in cnames:
916
            self.create_container(c)
917

    
918
    def test_delete(self):
919
        url = join_urls(self.pithos_path, self.user, 'c1')
920
        r = self.delete(url)
921
        self.assertEqual(r.status_code, 204)
922
        self.assertTrue('c1' not in self.list_containers(format=None))
923

    
924
    def test_delete_non_empty(self):
925
        self.upload_object('c1')
926
        url = join_urls(self.pithos_path, self.user, 'c1')
927
        r = self.delete(url)
928
        self.assertEqual(r.status_code, 409)
929
        self.assertTrue('c1' in self.list_containers(format=None))
930

    
931
    def test_delete_invalid(self):
932
        url = join_urls(self.pithos_path, self.user, 'c3')
933
        r = self.delete(url)
934
        self.assertEqual(r.status_code, 404)
935

    
936
    @pithos_test_settings(API_LIST_LIMIT=10)
937
    def test_delete_contents(self):
938
        folder = self.create_folder('c1')[0]
939
        for i in range(11):
940
            descendant = '%s_%d' % (strnextling(folder), i)
941
            self.upload_object('c1', descendant)
942
        self.create_folder('c1', '%s/%s' % (folder, get_random_data(5)))[0]
943

    
944
        r = self.delete('%s?delimiter=/' % join_urls(
945
            self.pithos_path, self.user, 'c1'))
946
        self.assertEqual(r.status_code, 204)
947
        self.assertEqual([], self.list_objects('c1'))
948
        self.assertTrue('c1' in self.list_containers(format=None))