Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / containers.py @ 3a19e99b

History | View | Annotate | Download (30.2 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names,
38
                             pithos_settings, pithos_test_settings)
39
from pithos.api.test.util import strnextling, get_random_data
40

    
41
from synnefo.lib import join_urls
42

    
43
import django.utils.simplejson as json
44
from django.http import urlencode
45

    
46
from xml.dom import minidom
47
from urllib import quote
48
import time as _time
49

    
50
import random
51
import datetime
52

    
53

    
54
class ContainerHead(PithosAPITest):
55
    def test_get_meta(self):
56
        self.create_container('apples')
57

    
58
        # populate with objects
59
        objects = {}
60
        for i in range(random.randint(1, 100)):
61

    
62
            # upload object
63
            meta = {'foo%s' % i: 'bar'}
64
            name, data, resp = self.upload_object('apples', **meta)
65
            objects[name] = data
66

    
67
        t1 = datetime.datetime.utcnow()
68
        url = join_urls(self.pithos_path, self.user, 'apples')
69
        r = self.head(url)
70
        self.assertEqual(int(r['X-Container-Object-Count']), len(objects))
71
        self.assertEqual(int(r['X-Container-Bytes-Used']),
72
                         sum([len(i) for i in objects.values()]))
73
        self.assertTrue('X-Container-Block-Size' in r)
74
        self.assertTrue('X-Container-Block-Hash' in r)
75
        self.assertTrue('X-Container-Until-Timestamp' not in r)
76
        self.assertEqual(r['X-Container-Policy-Versioning'], 'auto')
77
        self.assertEqual(int(r['X-Container-Policy-Quota']), 0)
78
        t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
79
        delta = (t2 - t1)
80
        threashold = datetime.timedelta(seconds=1)
81
        self.assertTrue(delta < threashold)
82
        self.assertTrue(r['X-Container-Object-Meta'])
83
        (self.assertTrue('foo%s' % i in r['X-Container-Object-Meta'])
84
            for i in range(len(objects)))
85

    
86

    
87
class ContainerGet(PithosAPITest):
88
    def setUp(self):
89
        PithosAPITest.setUp(self)
90

    
91
        self.cnames = ['pears', 'apples']
92
        self.objects = {}
93
        for c in self.cnames:
94
            self.create_container(c)
95

    
96
        self.objects['pears'] = {}
97
        for o in o_names[:8]:
98
            name, data, resp = self.upload_object('pears', o)
99
            self.objects['pears'][name] = data
100
        self.objects['apples'] = {}
101
        for o in o_names[8:]:
102
            name, data, resp = self.upload_object('apples', o)
103
            self.objects['apples'][name] = data
104

    
105
    def test_list_shared(self):
106
        # share an object
107
        cname = self.cnames[0]
108
        onames = self.objects[cname].keys()
109
        oname = onames.pop()
110
        url = join_urls(self.pithos_path, self.user, cname, oname)
111
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=*')
112
        self.assertEqual(r.status_code, 202)
113

    
114
        # publish another object
115
        other = onames.pop()
116
        url = join_urls(self.pithos_path, self.user, cname, other)
117
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
118
        self.assertEqual(r.status_code, 202)
119

    
120
        # list shared and assert only the shared object is returned
121
        url = join_urls(self.pithos_path, self.user, cname)
122
        r = self.get('%s?shared=' % url)
123
        self.assertEqual(r.status_code, 200)
124
        objects = r.content.split('\n')
125
        if '' in objects:
126
            objects.remove('')
127
        self.assertEqual([oname], objects)
128

    
129
        # list detailed shared and assert only the shared object is returned
130
        url = join_urls(self.pithos_path, self.user, cname)
131
        r = self.get('%s?shared=&format=json' % url)
132
        self.assertEqual(r.status_code, 200)
133
        try:
134
            objects = json.loads(r.content)
135
        except:
136
            self.fail('json format expected')
137
        self.assertEqual([oname], [o['name'] for o in objects])
138
        self.assertTrue('x_object_sharing' in objects[0])
139
        self.assertTrue('x_object_public' not in objects[0])
140

    
141
        # publish the shared object and assert it is still listed in the
142
        # shared objects
143
        url = join_urls(self.pithos_path, self.user, cname, oname)
144
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
145
        self.assertEqual(r.status_code, 202)
146
        url = join_urls(self.pithos_path, self.user, cname)
147
        r = self.get('%s?shared=&format=json' % url)
148
        self.assertEqual(r.status_code, 200)
149
        try:
150
            objects = json.loads(r.content)
151
        except:
152
            self.fail('json format expected')
153
        self.assertEqual([oname], [o['name'] for o in objects])
154
        self.assertTrue('x_object_sharing' in objects[0])
155
        self.assertTrue('x_object_public' in objects[0])
156

    
157
        # create child object
158
        descendant = strnextling(oname)
159
        self.upload_object(cname, descendant)
160
        # request shared and assert child obejct is not listed
161
        url = join_urls(self.pithos_path, self.user, cname)
162
        r = self.get('%s?shared=' % url)
163
        self.assertEqual(r.status_code, 200)
164
        objects = r.content.split('\n')
165
        if '' in objects:
166
            objects.remove('')
167
        self.assertTrue(oname in objects)
168
        self.assertTrue(descendant not in objects)
169

    
170
        # check folder inheritance
171
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
172
        # create child object
173
        descendant = '%s/%s' % (oname, get_random_data(8))
174
        self.upload_object(cname, descendant)
175
        # request shared
176
        url = join_urls(self.pithos_path, self.user, cname)
177
        r = self.get('%s?shared=' % url)
178
        self.assertEqual(r.status_code, 200)
179
        objects = r.content.split('\n')
180
        if '' in objects:
181
            objects.remove('')
182
        self.assertTrue(oname in objects)
183
        self.assertTrue(descendant in objects)
184

    
185
    def test_list_public(self):
186
        # publish an object
187
        cname = self.cnames[0]
188
        onames = self.objects[cname].keys()
189
        oname = onames.pop()
190
        other = onames.pop()
191

    
192
        # publish an object
193
        url = join_urls(self.pithos_path, self.user, cname, oname)
194
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
195
        self.assertEqual(r.status_code, 202)
196

    
197
        # share another
198
        url = join_urls(self.pithos_path, self.user, cname, other)
199
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
200
        self.assertEqual(r.status_code, 202)
201

    
202
        # list public and assert only the public object is returned
203
        url = join_urls(self.pithos_path, self.user, cname)
204
        r = self.get('%s?public=' % url)
205
        objects = r.content.split('\n')
206
        self.assertEqual(r.status_code, 200)
207
        self.assertTrue(oname in r.content.split('\n'))
208
        (self.assertTrue(object not in objects) for object in o_names[1:])
209

    
210
        # list detailed public and assert only the public object is returned
211
        url = join_urls(self.pithos_path, self.user, cname)
212
        r = self.get('%s?public=&format=json' % url)
213
        self.assertEqual(r.status_code, 200)
214
        try:
215
            objects = json.loads(r.content)
216
        except:
217
            self.fail('json format expected')
218
        self.assertEqual([oname], [obj['name'] for obj in objects])
219
        self.assertTrue('x_object_sharing' not in objects[0])
220
        self.assertTrue('x_object_public' in objects[0])
221

    
222
        # share the public object and assert it is still listed in the
223
        # public objects
224
        url = join_urls(self.pithos_path, self.user, cname, oname)
225
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
226
        self.assertEqual(r.status_code, 202)
227
        url = join_urls(self.pithos_path, self.user, cname)
228
        r = self.get('%s?public=&format=json' % url)
229
        self.assertEqual(r.status_code, 200)
230
        try:
231
            objects = json.loads(r.content)
232
        except:
233
            self.fail('json format expected')
234
        self.assertEqual([oname], [obj['name'] for obj in objects])
235
        self.assertTrue('x_object_sharing' in objects[0])
236
        self.assertTrue('x_object_public' in objects[0])
237

    
238
        url = join_urls(self.pithos_path, self.user, cname)
239

    
240
        # Assert listing the container public contents is forbidden to not
241
        # shared users
242
        r = self.get('%s?public=&format=json' % url, user='bob')
243
        self.assertEqual(r.status_code, 403)
244

    
245
        # Assert forbidden public object listing to shared users
246
        r = self.get('%s?public=&format=json' % url, user='alice')
247
        self.assertEqual(r.status_code, 403)
248

    
249
        # create child object
250
        descendant = strnextling(oname)
251
        self.upload_object(cname, descendant)
252
        # request public and assert child obejct is not listed
253
        r = self.get('%s?public=' % url)
254
        objects = r.content.split('\n')
255
        if '' in objects:
256
            objects.remove('')
257
        self.assertEqual(r.status_code, 200)
258
        self.assertTrue(oname in objects)
259
        (self.assertTrue(o not in objects) for o in o_names[1:])
260

    
261
        # test folder inheritance
262
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
263
        # create child object
264
        descendant = '%s/%s' % (oname, get_random_data(8))
265
        self.upload_object(cname, descendant)
266
        # request public
267
        r = self.get('%s?public=' % url)
268
        self.assertEqual(r.status_code, 200)
269
        objects = r.content.split('\n')
270
        self.assertTrue(oname in objects)
271
        self.assertTrue(descendant not in objects)
272

    
273
    def test_list_shared_public(self):
274
        # publish an object
275
        cname = self.cnames[0]
276
        container_url = join_urls(self.pithos_path, self.user, cname)
277
        onames = self.objects[cname].keys()
278
        oname = onames.pop()
279
        url = join_urls(container_url, oname)
280
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
281
        self.assertEqual(r.status_code, 202)
282

    
283
        # share another
284
        other = onames.pop()
285
        url = join_urls(container_url, other)
286
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
287
        self.assertEqual(r.status_code, 202)
288

    
289
        # list shared and public objects and assert object is listed
290
        r = self.get('%s?shared=&public=&format=json' % container_url)
291
        self.assertEqual(r.status_code, 200)
292
        objects = json.loads(r.content)
293
        self.assertEqual([o['name'] for o in objects], sorted([oname, other]))
294
        for o in objects:
295
            if o['name'] == oname:
296
                self.assertTrue('x_object_public' in o.keys())
297
            elif o['name'] == other:
298
                self.assertTrue('x_object_sharing' in o.keys())
299

    
300
        # assert not listing shared and public to a not shared user
301
        r = self.get('%s?shared=&public=&format=json' % container_url,
302
                     user='bob')
303
        self.assertEqual(r.status_code, 403)
304

    
305
        # assert not listing public to a shared user
306
        r = self.get('%s?shared=&public=&format=json' % container_url,
307
                     user='alice')
308
        self.assertEqual(r.status_code, 403)
309

    
310
        # create child object
311
        descendant = strnextling(oname)
312
        self.upload_object(cname, descendant)
313
        # request public and assert child obejct is not listed
314
        r = self.get('%s?shared=&public=' % container_url)
315
        objects = r.content.split('\n')
316
        if '' in objects:
317
            objects.remove('')
318
        self.assertEqual(r.status_code, 200)
319
        self.assertTrue(oname in objects)
320
        (self.assertTrue(o not in objects) for o in o_names[1:])
321

    
322
        # test folder inheritance
323
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
324
        # create child object
325
        descendant = '%s/%s' % (oname, get_random_data(8))
326
        self.upload_object(cname, descendant)
327
        # request public
328
        r = self.get('%s?shared=&public=' % container_url)
329
        self.assertEqual(r.status_code, 200)
330
        objects = r.content.split('\n')
331
        if '' in objects:
332
            objects.remove('')
333
        self.assertTrue(oname in objects)
334
        self.assertTrue(descendant not in objects)
335

    
336
    def test_list_objects(self):
337
        cname = self.cnames[0]
338
        url = join_urls(self.pithos_path, self.user, cname)
339
        r = self.get(url)
340
        self.assertTrue(r.status_code, 200)
341
        objects = r.content.split('\n')
342
        if '' in objects:
343
            objects.remove('')
344
        self.assertEqual(objects, sorted(self.objects[cname].keys()))
345

    
346
    def test_list_objects_containing_slash(self):
347
        self.create_container('test')
348
        self.upload_object('test', quote('/objectname', ''))
349

    
350
        url = join_urls(self.pithos_path, self.user, 'test')
351

    
352
        r = self.get(url)
353
        objects = r.content.split('\n')
354
        if '' in objects:
355
            objects.remove('')
356
        self.assertEqual(objects, ['/objectname'])
357

    
358
        r = self.get('%s?format=json' % url)
359
        try:
360
            objects = json.loads(r.content)
361
        except:
362
            self.fail('json format expected')
363
        self.assertEqual([o['name'] for o in objects], ['/objectname'])
364

    
365
        r = self.get('%s?format=xml' % url)
366
        try:
367
            objects = minidom.parseString(r.content)
368
        except:
369
            self.fail('xml format expected')
370
        self.assertEqual(
371
            [n.firstChild.data for n in objects.getElementsByTagName('name')],
372
            ['/objectname'])
373

    
374
    def test_list_objects_with_limit_marker(self):
375
        cname = self.cnames[0]
376
        url = join_urls(self.pithos_path, self.user, cname)
377
        r = self.get('%s?limit=qwert' % url)
378
        self.assertTrue(r.status_code != 500)
379

    
380
        r = self.get('%s?limit=2' % url)
381
        self.assertEqual(r.status_code, 200)
382
        objects = r.content.split('\n')
383
        if '' in objects:
384
            objects.remove('')
385

    
386
        onames = sorted(self.objects[cname].keys())
387
        self.assertEqual(objects, onames[:2])
388

    
389
        markers = ['How To Win Friends And Influence People.pdf',
390
                   'moms_birthday.jpg']
391
        limit = 4
392
        for m in markers:
393
            r = self.get('%s?limit=%s&marker=%s' % (url, limit, m))
394
            objects = r.content.split('\n')
395
            if '' in objects:
396
                objects.remove('')
397
            start = onames.index(m) + 1
398
            end = start + limit
399
            end = end if len(onames) >= end else len(onames)
400
            self.assertEqual(objects, onames[start:end])
401

    
402
    @pithos_test_settings(API_LIST_LIMIT=10)
403
    def test_list_limit_exceeds(self):
404
        self.create_container('container')
405
        url = join_urls(self.pithos_path, self.user, 'container')
406

    
407
        for _ in range(pithos_settings.API_LIST_LIMIT + 1):
408
            self.upload_object('container')
409

    
410
        r = self.get('%s?format=json' % url)
411
        try:
412
            objects = json.loads(r.content)
413
        except:
414
            self.fail('json format expected')
415
        self.assertEqual(pithos_settings.API_LIST_LIMIT,
416
                         len(objects))
417

    
418
    def test_list_pseudo_hierarchical_folders(self):
419
        url = join_urls(self.pithos_path, self.user, 'apples')
420
        r = self.get('%s?prefix=photos&delimiter=/' % url)
421
        self.assertEqual(r.status_code, 200)
422
        objects = r.content.split('\n')
423
        if '' in objects:
424
            objects.remove('')
425
        self.assertEquals(
426
            ['photos/animals/', 'photos/me.jpg', 'photos/plants/'],
427
            objects)
428

    
429
        r = self.get('%s?prefix=photos/animals&delimiter=/' % url)
430
        objects = r.content.split('\n')
431
        if '' in objects:
432
            objects.remove('')
433
        self.assertEquals(
434
            ['photos/animals/cats/', 'photos/animals/dogs/'], objects)
435

    
436
        r = self.get('%s?path=photos' % url)
437
        objects = r.content.split('\n')
438
        if '' in objects:
439
            objects.remove('')
440
        self.assertEquals(['photos/me.jpg'], objects)
441

    
442
    def test_extended_list_json(self):
443
        url = join_urls(self.pithos_path, self.user, 'apples')
444
        params = {'format': 'json', 'limit': 2, 'prefix': 'photos/animals',
445
                  'delimiter': '/'}
446
        r = self.get('%s?%s' % (url, urlencode(params)))
447
        self.assertEqual(r.status_code, 200)
448
        try:
449
            objects = json.loads(r.content)
450
        except:
451
            self.fail('json format expected')
452
        self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
453
        self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
454

    
455
    def test_extended_list_xml(self):
456
        url = join_urls(self.pithos_path, self.user, 'apples')
457
        params = {'format': 'xml', 'limit': 4, 'prefix': 'photos',
458
                  'delimiter': '/'}
459
        r = self.get('%s?%s' % (url, urlencode(params)))
460
        self.assertEqual(r.status_code, 200)
461
        try:
462
            xml = minidom.parseString(r.content)
463
        except:
464
            self.fail('xml format expected')
465
        self.assert_extended(xml, 'xml', 'object', size=4)
466
        dirs = xml.getElementsByTagName('subdir')
467
        self.assertEqual(len(dirs), 2)
468
        self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
469
        self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
470

    
471
        objects = xml.getElementsByTagName('name')
472
        self.assertEqual(len(objects), 1)
473
        self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
474

    
475
    def test_list_meta_double_matching(self):
476
        # update object meta
477
        cname = 'apples'
478
        container_url = join_urls(self.pithos_path, self.user, cname)
479
        oname = self.objects[cname].keys().pop()
480
        meta = {'quality': 'aaa', 'stock': 'true'}
481
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
482
                       for k, v in meta.iteritems())
483
        object_url = join_urls(container_url, oname)
484
        self.post(object_url, content_type='', **headers)
485

    
486
        # list objects that satisfy the criteria
487
        r = self.get('%s?meta=Quality,Stock' % container_url)
488
        self.assertEqual(r.status_code, 200)
489
        objects = r.content.split('\n')
490
        if '' in objects:
491
            objects.remove('')
492
        self.assertEqual(objects, [oname])
493

    
494
    def test_list_using_meta(self):
495
        # update object meta
496
        cname = 'apples'
497
        container_url = join_urls(self.pithos_path, self.user, cname)
498

    
499
        onames = self.objects[cname].keys()
500
        url = join_urls(container_url, onames[0])
501
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='aaa')
502
        self.assertEqual(r.status_code, 202)
503

    
504
        url = join_urls(container_url, onames[1])
505
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='ab')
506
        self.assertEqual(r.status_code, 202)
507

    
508
        url = join_urls(container_url, onames[2])
509
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='100')
510
        self.assertEqual(r.status_code, 202)
511

    
512
        url = join_urls(container_url, onames[3])
513
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='200')
514
        self.assertEqual(r.status_code, 202)
515

    
516
        # test multiple existence criteria matches
517
        r = self.get('%s?meta=Quality,Stock' % container_url)
518
        self.assertEqual(r.status_code, 200)
519
        objects = r.content.split('\n')
520
        if '' in objects:
521
            objects.remove('')
522
        self.assertTrue(objects, sorted(onames))
523

    
524
        # list objects that satisfy the existence criteria
525
        r = self.get('%s?meta=Stock' % container_url)
526
        self.assertEqual(r.status_code, 200)
527
        objects = r.content.split('\n')
528
        if '' in objects:
529
            objects.remove('')
530
        self.assertTrue(objects, sorted(onames[2:]))
531

    
532
        # test case insensitive existence criteria matching
533
        r = self.get('%s?meta=quality' % container_url)
534
        self.assertEqual(r.status_code, 200)
535
        objects = r.content.split('\n')
536
        if '' in objects:
537
            objects.remove('')
538
        self.assertTrue(objects, sorted(onames[:2]))
539

    
540
        # test do not all existencecriteria match
541
        r = self.get('%s?meta=Quality,Foo' % container_url)
542
        self.assertEqual(r.status_code, 200)
543
        objects = r.content.split('\n')
544
        if '' in objects:
545
            objects.remove('')
546
        self.assertTrue(objects, sorted(onames[:2]))
547

    
548
        # test equals criteria
549
        r = self.get('%s?meta=%s' % (container_url, quote('Quality=aaa')))
550
        self.assertEqual(r.status_code, 200)
551
        objects = r.content.split('\n')
552
        if '' in objects:
553
            objects.remove('')
554
        self.assertTrue(objects, [onames[0]])
555

    
556
        # test not equals criteria
557
        r = self.get('%s?meta=%s' % (container_url, quote('Quality!=aaa')))
558
        self.assertEqual(r.status_code, 200)
559
        objects = r.content.split('\n')
560
        if '' in objects:
561
            objects.remove('')
562
        self.assertTrue(objects, [onames[1]])
563

    
564
        # test lte criteria
565
        r = self.get('%s?meta=%s' % (container_url, quote('Stock<=120')))
566
        self.assertEqual(r.status_code, 200)
567
        objects = r.content.split('\n')
568
        if '' in objects:
569
            objects.remove('')
570
        self.assertTrue(objects, [onames[2]])
571

    
572
        # test gte criteria
573
        r = self.get('%s?meta=%s' % (container_url, quote('Stock>=200')))
574
        self.assertEqual(r.status_code, 200)
575
        objects = r.content.split('\n')
576
        if '' in objects:
577
            objects.remove('')
578
        self.assertTrue(objects, [onames[3]])
579

    
580
    def test_if_modified_since(self):
581
        cname = 'apples'
582
        container_info = self.get_container_info(cname)
583
        last_modified = container_info['Last-Modified']
584
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
585
        t1_formats = map(t1.strftime, DATE_FORMATS)
586

    
587
        # Check not modified
588
        url = join_urls(self.pithos_path, self.user, cname)
589
        for t in t1_formats:
590
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
591
            self.assertEqual(r.status_code, 304)
592

    
593
        # modify account: add container
594
        _time.sleep(1)
595
        oname = self.upload_object(cname)[0]
596

    
597
        # Check modified
598
        objects = self.objects[cname].keys()
599
        objects.append(oname)
600
        for t in t1_formats:
601
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
602
            self.assertEqual(r.status_code, 200)
603
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
604

    
605
        container_info = self.get_container_info(cname)
606
        last_modified = container_info['Last-Modified']
607
        t2 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
608
        t2_formats = map(t2.strftime, DATE_FORMATS)
609

    
610
        # modify account: update account meta
611
        _time.sleep(1)
612
        self.update_container_meta(cname, {'foo': 'bar'})
613

    
614
        # Check modified
615
        for t in t2_formats:
616
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
617
            self.assertEqual(r.status_code, 200)
618
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
619

    
620
    def test_if_modified_since_invalid_date(self):
621
        cname = 'apples'
622
        url = join_urls(self.pithos_path, self.user, cname)
623
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
624
        self.assertEqual(r.status_code, 200)
625
        self.assertEqual(r.content.split('\n')[:-1],
626
                         sorted(self.objects['apples'].keys()))
627

    
628
    def test_if_not_modified_since(self):
629
        cname = 'apples'
630
        url = join_urls(self.pithos_path, self.user, cname)
631
        container_info = self.get_container_info(cname)
632
        last_modified = container_info['Last-Modified']
633
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
634

    
635
        # Check unmodified
636
        t1 = t + datetime.timedelta(seconds=1)
637
        t1_formats = map(t1.strftime, DATE_FORMATS)
638
        for t in t1_formats:
639
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
640
            self.assertEqual(r.status_code, 200)
641
            self.assertEqual(
642
                r.content.split('\n')[:-1],
643
                sorted(self.objects['apples']))
644

    
645
        # modify account: add container
646
        _time.sleep(2)
647
        self.upload_object(cname)
648

    
649
        container_info = self.get_container_info(cname)
650
        last_modified = container_info['Last-Modified']
651
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
652
        t2 = t - datetime.timedelta(seconds=1)
653
        t2_formats = map(t2.strftime, DATE_FORMATS)
654

    
655
        # Check modified
656
        for t in t2_formats:
657
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
658
            self.assertEqual(r.status_code, 412)
659

    
660
        # modify account: update account meta
661
        _time.sleep(1)
662
        self.update_container_meta(cname, {'foo': 'bar'})
663

    
664
        container_info = self.get_container_info(cname)
665
        last_modified = container_info['Last-Modified']
666
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
667
        t3 = t - datetime.timedelta(seconds=1)
668
        t3_formats = map(t3.strftime, DATE_FORMATS)
669

    
670
        # Check modified
671
        for t in t3_formats:
672
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
673
            self.assertEqual(r.status_code, 412)
674

    
675
    def test_if_unmodified_since(self):
676
        cname = 'apples'
677
        url = join_urls(self.pithos_path, self.user, cname)
678
        container_info = self.get_container_info(cname)
679
        last_modified = container_info['Last-Modified']
680
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
681
        t = t + datetime.timedelta(seconds=1)
682
        t_formats = map(t.strftime, DATE_FORMATS)
683

    
684
        for tf in t_formats:
685
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
686
            self.assertEqual(r.status_code, 200)
687
            self.assertEqual(
688
                r.content.split('\n')[:-1],
689
                sorted(self.objects['apples']))
690

    
691
    def test_if_unmodified_since_precondition_failed(self):
692
        cname = 'apples'
693
        url = join_urls(self.pithos_path, self.user, cname)
694
        container_info = self.get_container_info(cname)
695
        last_modified = container_info['Last-Modified']
696
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
697
        t = t - datetime.timedelta(seconds=1)
698
        t_formats = map(t.strftime, DATE_FORMATS)
699

    
700
        for tf in t_formats:
701
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
702
            self.assertEqual(r.status_code, 412)
703

    
704

    
705
class ContainerPut(PithosAPITest):
706
    def test_create(self):
707
        self.create_container('c1')
708
        self.list_containers()
709
        self.assertTrue('c1' in self.list_containers(format=None))
710

    
711
    def test_create_twice(self):
712
        self.create_container('c1')
713
        self.assertTrue('c1' in self.list_containers(format=None))
714
        r = self.create_container('c1')
715
        self.assertEqual(r.status_code, 202)
716
        self.assertTrue('c1' in self.list_containers(format=None))
717

    
718

    
719
class ContainerPost(PithosAPITest):
720
    def test_update_meta(self):
721
        cname = 'apples'
722
        self.create_container(cname)
723
        meta = {'test': 'test33', 'tost': 'tost22'}
724
        self.update_container_meta(cname, meta)
725
        info = self.get_container_info(cname)
726
        for k, v in meta.items():
727
            k = 'x-container-meta-%s' % k
728
            self.assertTrue(k in info)
729
            self.assertEqual(info[k], v)
730

    
731
    def test_quota(self):
732
        self.create_container('c1')
733

    
734
        url = join_urls(self.pithos_path, self.user, 'c1')
735
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
736
        self.assertEqual(r.status_code, 202)
737

    
738
        info = self.get_container_info('c1')
739
        self.assertTrue('x-container-policy-quota' in info)
740
        self.assertEqual(info['x-container-policy-quota'], '100')
741

    
742
        r = self.upload_object('c1', length=101, verify=False)[2]
743
        self.assertEqual(r.status_code, 413)
744

    
745
        url = join_urls(self.pithos_path, self.user, 'c1')
746
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='0')
747
        self.assertEqual(r.status_code, 202)
748

    
749
        r = self.upload_object('c1', length=1)
750

    
751

    
752
class ContainerDelete(PithosAPITest):
753
    def setUp(self):
754
        PithosAPITest.setUp(self)
755
        cnames = ['c1', 'c2']
756

    
757
        for c in cnames:
758
            self.create_container(c)
759

    
760
    def test_delete(self):
761
        url = join_urls(self.pithos_path, self.user, 'c1')
762
        r = self.delete(url)
763
        self.assertEqual(r.status_code, 204)
764
        self.assertTrue('c1' not in self.list_containers(format=None))
765

    
766
    def test_delete_non_empty(self):
767
        self.upload_object('c1')
768
        url = join_urls(self.pithos_path, self.user, 'c1')
769
        r = self.delete(url)
770
        self.assertEqual(r.status_code, 409)
771
        self.assertTrue('c1' in self.list_containers(format=None))
772

    
773
    def test_delete_invalid(self):
774
        url = join_urls(self.pithos_path, self.user, 'c3')
775
        r = self.delete(url)
776
        self.assertEqual(r.status_code, 404)
777

    
778
    def test_delete_contents(self):
779
        folder = self.create_folder('c1')[0]
780
        descendant = strnextling(folder)
781
        self.upload_object('c1', descendant)
782
        self.create_folder('c1', '%s/%s' % (folder, get_random_data(5)))[0]
783

    
784
        self.delete('%s?delimiter=/' % join_urls(
785
            self.pithos_path, self.user, 'c1'))
786
        self.assertEqual([], self.list_objects('c1'))
787
        self.assertTrue('c1' in self.list_containers(format=None))