Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / containers.py @ 78bd53a7

History | View | Annotate | Download (33.9 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names,
38
                             pithos_settings, pithos_test_settings)
39
from pithos.api.test.util import strnextling, get_random_data, get_random_name
40

    
41
from synnefo.lib import join_urls
42

    
43
import django.utils.simplejson as json
44
from django.http import urlencode
45

    
46
from xml.dom import minidom
47
from urllib import quote
48
import time as _time
49

    
50
import random
51
import datetime
52

    
53

    
54
class ContainerHead(PithosAPITest):
55
    def test_get_meta(self):
56
        self.create_container('apples')
57

    
58
        # populate with objects
59
        objects = {}
60
        for i in range(random.randint(1, 100)):
61

    
62
            # upload object
63
            meta = {'foo%s' % i: 'bar'}
64
            name, data, resp = self.upload_object('apples', **meta)
65
            objects[name] = data
66

    
67
        t1 = datetime.datetime.utcnow()
68
        url = join_urls(self.pithos_path, self.user, 'apples')
69
        r = self.head(url)
70
        self.assertEqual(int(r['X-Container-Object-Count']), len(objects))
71
        self.assertEqual(int(r['X-Container-Bytes-Used']),
72
                         sum([len(i) for i in objects.values()]))
73
        self.assertTrue('X-Container-Block-Size' in r)
74
        self.assertTrue('X-Container-Block-Hash' in r)
75
        self.assertTrue('X-Container-Until-Timestamp' not in r)
76
        self.assertEqual(r['X-Container-Policy-Versioning'], 'auto')
77
        self.assertEqual(int(r['X-Container-Policy-Quota']), 0)
78
        t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
79
        delta = (t2 - t1)
80
        threashold = datetime.timedelta(seconds=1)
81
        self.assertTrue(delta < threashold)
82
        self.assertTrue(r['X-Container-Object-Meta'])
83
        (self.assertTrue('foo%s' % i in r['X-Container-Object-Meta'])
84
            for i in range(len(objects)))
85

    
86
    def test_get_container_meta_until(self):
87
        self.create_container('apples')
88

    
89
        # populate with objects
90
        objects = {}
91
        metalist = []
92
        for i in range(random.randint(1, 100)):
93
            # upload object
94
            metakey = 'foo%s' % i
95
            meta = {metakey: 'bar'}
96
            name, data, resp = self.upload_object('apples', **meta)
97
            objects[name] = data
98
            metalist.append(metakey) 
99

    
100
        self.update_container_meta('apples', {'foo': 'bar'})
101

    
102
        container_info = self.get_container_info('apples')
103
        t = datetime.datetime.strptime(container_info['Last-Modified'],
104
                                       DATE_FORMATS[2])
105
        t1 = t + datetime.timedelta(seconds=1)
106
        until = int(_time.mktime(t1.timetuple()))
107

    
108
        _time.sleep(2)
109

    
110
        for i in range(random.randint(1, 100)):
111
            # upload object
112
            meta = {'foo%s' % i: 'bar'}
113
            self.upload_object('apples', **meta)
114

    
115
        self.update_container_meta('apples', {'quality': 'AAA'})
116

    
117
        container_info = self.get_container_info('apples')
118
        self.assertTrue('X-Container-Meta-Quality' in container_info)
119
        self.assertTrue('X-Container-Meta-Foo' in container_info)
120
        self.assertTrue('X-Container-Object-Count' in container_info)
121
        self.assertTrue(int(container_info['X-Container-Object-Count']) > len(objects))
122
        self.assertTrue('X-Container-Bytes-Used' in container_info)
123

    
124
        t = datetime.datetime.strptime(container_info['Last-Modified'],
125
                                       DATE_FORMATS[-1])
126
        last_modified = int(_time.mktime(t.timetuple()))
127
        assert until < last_modified
128

    
129
        container_info = self.get_container_info('apples', until=until)
130
        self.assertTrue('X-Container-Meta-Quality' not in container_info)
131
        self.assertTrue('X-Container-Meta-Foo' in container_info)
132
        self.assertTrue('X-Container-Until-Timestamp' in container_info)
133
        t = datetime.datetime.strptime(
134
            container_info['X-Container-Until-Timestamp'], DATE_FORMATS[2])
135
        self.assertTrue(int(_time.mktime(t1.timetuple())) <= until)
136
        self.assertTrue('X-Container-Object-Count' in container_info)
137
        self.assertEqual(int(container_info['X-Container-Object-Count']), len(objects))
138
        self.assertTrue('X-Container-Bytes-Used' in container_info)
139
        self.assertEqual(int(container_info['X-Container-Bytes-Used']),
140
                         sum([len(data) for data in objects.values()]))
141
        self.assertTrue('X-Container-Object-Meta' in container_info)
142
        self.assertEqual(container_info['X-Container-Object-Meta'],
143
                         metalist) 
144

    
145

    
146
class ContainerGet(PithosAPITest):
147
    def setUp(self):
148
        PithosAPITest.setUp(self)
149

    
150
        self.cnames = ['pears', 'apples']
151
        self.objects = {}
152
        for c in self.cnames:
153
            self.create_container(c)
154

    
155
        self.objects['pears'] = {}
156
        for o in o_names[:8]:
157
            name, data, resp = self.upload_object('pears', o)
158
            self.objects['pears'][name] = data
159
        self.objects['apples'] = {}
160
        for o in o_names[8:]:
161
            name, data, resp = self.upload_object('apples', o)
162
            self.objects['apples'][name] = data
163

    
164
    def test_list_until(self):
165
        account_info = self.get_account_info()
166
        t = datetime.datetime.strptime(account_info['Last-Modified'],
167
                                       DATE_FORMATS[2])
168
        t1 = t + datetime.timedelta(seconds=1)
169
        until = int(_time.mktime(t1.timetuple()))
170

    
171
        _time.sleep(2)
172

    
173
        cname = self.cnames[0]
174
        self.upload_object(cname)
175

    
176
        url = join_urls(self.pithos_path, self.user, cname)
177
        r = self.get('%s?until=%s' % (url, until))
178
        self.assertTrue(r.status_code, 200)
179
        objects = r.content.split('\n')
180
        if '' in objects:
181
            objects.remove('')
182
        self.assertEqual(objects,
183
                         sorted(self.objects[cname].keys()))
184

    
185
        r = self.get('%s?until=%s&format=json' % (url, until))
186
        self.assertTrue(r.status_code, 200)
187
        try:
188
            objects = json.loads(r.content)
189
        except:
190
            self.fail('json format expected')
191
        self.assertEqual([o['name'] for o in objects],
192
                         sorted(self.objects[cname].keys()))
193

    
194
    def test_list_shared(self):
195
        # share an object
196
        cname = self.cnames[0]
197
        onames = self.objects[cname].keys()
198
        oname = onames.pop()
199
        url = join_urls(self.pithos_path, self.user, cname, oname)
200
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=*')
201
        self.assertEqual(r.status_code, 202)
202

    
203
        # publish another object
204
        other = onames.pop()
205
        url = join_urls(self.pithos_path, self.user, cname, other)
206
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
207
        self.assertEqual(r.status_code, 202)
208

    
209
        # list shared and assert only the shared object is returned
210
        url = join_urls(self.pithos_path, self.user, cname)
211
        r = self.get('%s?shared=' % url)
212
        self.assertEqual(r.status_code, 200)
213
        objects = r.content.split('\n')
214
        if '' in objects:
215
            objects.remove('')
216
        self.assertEqual([oname], objects)
217

    
218
        # list detailed shared and assert only the shared object is returned
219
        url = join_urls(self.pithos_path, self.user, cname)
220
        r = self.get('%s?shared=&format=json' % url)
221
        self.assertEqual(r.status_code, 200)
222
        try:
223
            objects = json.loads(r.content)
224
        except:
225
            self.fail('json format expected')
226
        self.assertEqual([oname], [o['name'] for o in objects])
227
        self.assertTrue('x_object_sharing' in objects[0])
228
        self.assertTrue('x_object_public' not in objects[0])
229

    
230
        # publish the shared object and assert it is still listed in the
231
        # shared objects
232
        url = join_urls(self.pithos_path, self.user, cname, oname)
233
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
234
        self.assertEqual(r.status_code, 202)
235
        url = join_urls(self.pithos_path, self.user, cname)
236
        r = self.get('%s?shared=&format=json' % url)
237
        self.assertEqual(r.status_code, 200)
238
        try:
239
            objects = json.loads(r.content)
240
        except:
241
            self.fail('json format expected')
242
        self.assertEqual([oname], [o['name'] for o in objects])
243
        self.assertTrue('x_object_sharing' in objects[0])
244
        self.assertTrue('x_object_public' in objects[0])
245

    
246
        # create child object
247
        descendant = strnextling(oname)
248
        self.upload_object(cname, descendant)
249
        # request shared and assert child obejct is not listed
250
        url = join_urls(self.pithos_path, self.user, cname)
251
        r = self.get('%s?shared=' % url)
252
        self.assertEqual(r.status_code, 200)
253
        objects = r.content.split('\n')
254
        if '' in objects:
255
            objects.remove('')
256
        self.assertTrue(oname in objects)
257
        self.assertTrue(descendant not in objects)
258

    
259
        # check folder inheritance
260
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
261
        # create child object
262
        descendant = '%s/%s' % (oname, get_random_name())
263
        self.upload_object(cname, descendant)
264
        # request shared
265
        url = join_urls(self.pithos_path, self.user, cname)
266
        r = self.get('%s?shared=' % url)
267
        self.assertEqual(r.status_code, 200)
268
        objects = r.content.split('\n')
269
        if '' in objects:
270
            objects.remove('')
271
        self.assertTrue(oname in objects)
272
        self.assertTrue(descendant in objects)
273

    
274
    def test_list_public(self):
275
        # publish an object
276
        cname = self.cnames[0]
277
        onames = self.objects[cname].keys()
278
        oname = onames.pop()
279
        other = onames.pop()
280

    
281
        # publish an object
282
        url = join_urls(self.pithos_path, self.user, cname, oname)
283
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
284
        self.assertEqual(r.status_code, 202)
285

    
286
        # share another
287
        url = join_urls(self.pithos_path, self.user, cname, other)
288
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
289
        self.assertEqual(r.status_code, 202)
290

    
291
        # list public and assert only the public object is returned
292
        url = join_urls(self.pithos_path, self.user, cname)
293
        r = self.get('%s?public=' % url)
294
        objects = r.content.split('\n')
295
        self.assertEqual(r.status_code, 200)
296
        self.assertTrue(oname in r.content.split('\n'))
297
        (self.assertTrue(object not in objects) for object in o_names[1:])
298

    
299
        # list detailed public and assert only the public object is returned
300
        url = join_urls(self.pithos_path, self.user, cname)
301
        r = self.get('%s?public=&format=json' % url)
302
        self.assertEqual(r.status_code, 200)
303
        try:
304
            objects = json.loads(r.content)
305
        except:
306
            self.fail('json format expected')
307
        self.assertEqual([oname], [obj['name'] for obj in objects])
308
        self.assertTrue('x_object_sharing' not in objects[0])
309
        self.assertTrue('x_object_public' in objects[0])
310

    
311
        # share the public object and assert it is still listed in the
312
        # public objects
313
        url = join_urls(self.pithos_path, self.user, cname, oname)
314
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
315
        self.assertEqual(r.status_code, 202)
316
        url = join_urls(self.pithos_path, self.user, cname)
317
        r = self.get('%s?public=&format=json' % url)
318
        self.assertEqual(r.status_code, 200)
319
        try:
320
            objects = json.loads(r.content)
321
        except:
322
            self.fail('json format expected')
323
        self.assertEqual([oname], [obj['name'] for obj in objects])
324
        self.assertTrue('x_object_sharing' in objects[0])
325
        self.assertTrue('x_object_public' in objects[0])
326

    
327
        url = join_urls(self.pithos_path, self.user, cname)
328

    
329
        # Assert listing the container public contents is forbidden to not
330
        # shared users
331
        r = self.get('%s?public=&format=json' % url, user='bob')
332
        self.assertEqual(r.status_code, 403)
333

    
334
        # Assert forbidden public object listing to shared users
335
        r = self.get('%s?public=&format=json' % url, user='alice')
336
        self.assertEqual(r.status_code, 403)
337

    
338
        # create child object
339
        descendant = strnextling(oname)
340
        self.upload_object(cname, descendant)
341
        # request public and assert child obejct is not listed
342
        r = self.get('%s?public=' % url)
343
        objects = r.content.split('\n')
344
        if '' in objects:
345
            objects.remove('')
346
        self.assertEqual(r.status_code, 200)
347
        self.assertTrue(oname in objects)
348
        (self.assertTrue(o not in objects) for o in o_names[1:])
349

    
350
        # test folder inheritance
351
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
352
        # create child object
353
        descendant = '%s/%s' % (oname, get_random_name())
354
        self.upload_object(cname, descendant)
355
        # request public
356
        r = self.get('%s?public=' % url)
357
        self.assertEqual(r.status_code, 200)
358
        objects = r.content.split('\n')
359
        self.assertTrue(oname in objects)
360
        self.assertTrue(descendant not in objects)
361

    
362
    def test_list_shared_public(self):
363
        # publish an object
364
        cname = self.cnames[0]
365
        container_url = join_urls(self.pithos_path, self.user, cname)
366
        onames = self.objects[cname].keys()
367
        oname = onames.pop()
368
        url = join_urls(container_url, oname)
369
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
370
        self.assertEqual(r.status_code, 202)
371

    
372
        # share another
373
        other = onames.pop()
374
        url = join_urls(container_url, other)
375
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
376
        self.assertEqual(r.status_code, 202)
377

    
378
        # list shared and public objects and assert object is listed
379
        r = self.get('%s?shared=&public=&format=json' % container_url)
380
        self.assertEqual(r.status_code, 200)
381
        objects = json.loads(r.content)
382
        self.assertEqual([o['name'] for o in objects], sorted([oname, other]))
383
        for o in objects:
384
            if o['name'] == oname:
385
                self.assertTrue('x_object_public' in o.keys())
386
            elif o['name'] == other:
387
                self.assertTrue('x_object_sharing' in o.keys())
388

    
389
        # assert not listing shared and public to a not shared user
390
        r = self.get('%s?shared=&public=&format=json' % container_url,
391
                     user='bob')
392
        self.assertEqual(r.status_code, 403)
393

    
394
        # assert not listing public to a shared user
395
        r = self.get('%s?shared=&public=&format=json' % container_url,
396
                     user='alice')
397
        self.assertEqual(r.status_code, 403)
398

    
399
        # create child object
400
        descendant = strnextling(oname)
401
        self.upload_object(cname, descendant)
402
        # request public and assert child obejct is not listed
403
        r = self.get('%s?shared=&public=' % container_url)
404
        objects = r.content.split('\n')
405
        if '' in objects:
406
            objects.remove('')
407
        self.assertEqual(r.status_code, 200)
408
        self.assertTrue(oname in objects)
409
        (self.assertTrue(o not in objects) for o in o_names[1:])
410

    
411
        # test folder inheritance
412
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
413
        # create child object
414
        descendant = '%s/%s' % (oname, get_random_name())
415
        self.upload_object(cname, descendant)
416
        # request public
417
        r = self.get('%s?shared=&public=' % container_url)
418
        self.assertEqual(r.status_code, 200)
419
        objects = r.content.split('\n')
420
        if '' in objects:
421
            objects.remove('')
422
        self.assertTrue(oname in objects)
423
        self.assertTrue(descendant not in objects)
424

    
425
    def test_list_objects(self):
426
        cname = self.cnames[0]
427
        url = join_urls(self.pithos_path, self.user, cname)
428
        r = self.get(url)
429
        self.assertTrue(r.status_code, 200)
430
        objects = r.content.split('\n')
431
        if '' in objects:
432
            objects.remove('')
433
        self.assertEqual(objects, sorted(self.objects[cname].keys()))
434

    
435
    def test_list_objects_containing_slash(self):
436
        self.create_container('test')
437
        self.upload_object('test', quote('/objectname', ''))
438

    
439
        url = join_urls(self.pithos_path, self.user, 'test')
440

    
441
        r = self.get(url)
442
        objects = r.content.split('\n')
443
        if '' in objects:
444
            objects.remove('')
445
        self.assertEqual(objects, ['/objectname'])
446

    
447
        r = self.get('%s?format=json' % url)
448
        try:
449
            objects = json.loads(r.content)
450
        except:
451
            self.fail('json format expected')
452
        self.assertEqual([o['name'] for o in objects], ['/objectname'])
453

    
454
        r = self.get('%s?format=xml' % url)
455
        try:
456
            objects = minidom.parseString(r.content)
457
        except:
458
            self.fail('xml format expected')
459
        self.assertEqual(
460
            [n.firstChild.data for n in objects.getElementsByTagName('name')],
461
            ['/objectname'])
462

    
463
    def test_list_objects_with_limit_marker(self):
464
        cname = self.cnames[0]
465
        url = join_urls(self.pithos_path, self.user, cname)
466
        r = self.get('%s?limit=qwert' % url)
467
        self.assertTrue(r.status_code != 500)
468

    
469
        r = self.get('%s?limit=2' % url)
470
        self.assertEqual(r.status_code, 200)
471
        objects = r.content.split('\n')
472
        if '' in objects:
473
            objects.remove('')
474

    
475
        onames = sorted(self.objects[cname].keys())
476
        self.assertEqual(objects, onames[:2])
477

    
478
        markers = ['How To Win Friends And Influence People.pdf',
479
                   'moms_birthday.jpg']
480
        limit = 4
481
        for m in markers:
482
            r = self.get('%s?limit=%s&marker=%s' % (url, limit, m))
483
            objects = r.content.split('\n')
484
            if '' in objects:
485
                objects.remove('')
486
            start = onames.index(m) + 1
487
            end = start + limit
488
            end = end if len(onames) >= end else len(onames)
489
            self.assertEqual(objects, onames[start:end])
490

    
491
    @pithos_test_settings(API_LIST_LIMIT=10)
492
    def test_list_limit_exceeds(self):
493
        self.create_container('container')
494
        url = join_urls(self.pithos_path, self.user, 'container')
495

    
496
        for _ in range(pithos_settings.API_LIST_LIMIT + 1):
497
            self.upload_object('container')
498

    
499
        r = self.get('%s?format=json' % url)
500
        try:
501
            objects = json.loads(r.content)
502
        except:
503
            self.fail('json format expected')
504
        self.assertEqual(pithos_settings.API_LIST_LIMIT,
505
                         len(objects))
506

    
507
    def test_list_pseudo_hierarchical_folders(self):
508
        url = join_urls(self.pithos_path, self.user, 'apples')
509
        r = self.get('%s?prefix=photos&delimiter=/' % url)
510
        self.assertEqual(r.status_code, 200)
511
        objects = r.content.split('\n')
512
        if '' in objects:
513
            objects.remove('')
514
        self.assertEquals(
515
            ['photos/animals/', 'photos/me.jpg', 'photos/plants/'],
516
            objects)
517

    
518
        r = self.get('%s?prefix=photos/animals&delimiter=/' % url)
519
        objects = r.content.split('\n')
520
        if '' in objects:
521
            objects.remove('')
522
        self.assertEquals(
523
            ['photos/animals/cats/', 'photos/animals/dogs/'], objects)
524

    
525
        r = self.get('%s?path=photos' % url)
526
        objects = r.content.split('\n')
527
        if '' in objects:
528
            objects.remove('')
529
        self.assertEquals(['photos/me.jpg'], objects)
530

    
531
    def test_extended_list_json(self):
532
        url = join_urls(self.pithos_path, self.user, 'apples')
533
        params = {'format': 'json', 'limit': 2, 'prefix': 'photos/animals',
534
                  'delimiter': '/'}
535
        r = self.get('%s?%s' % (url, urlencode(params)))
536
        self.assertEqual(r.status_code, 200)
537
        try:
538
            objects = json.loads(r.content)
539
        except:
540
            self.fail('json format expected')
541
        self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
542
        self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
543

    
544
    def test_extended_list_xml(self):
545
        url = join_urls(self.pithos_path, self.user, 'apples')
546
        params = {'format': 'xml', 'limit': 4, 'prefix': 'photos',
547
                  'delimiter': '/'}
548
        r = self.get('%s?%s' % (url, urlencode(params)))
549
        self.assertEqual(r.status_code, 200)
550
        try:
551
            xml = minidom.parseString(r.content)
552
        except:
553
            self.fail('xml format expected')
554
        self.assert_extended(xml, 'xml', 'object', size=4)
555
        dirs = xml.getElementsByTagName('subdir')
556
        self.assertEqual(len(dirs), 2)
557
        self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
558
        self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
559

    
560
        objects = xml.getElementsByTagName('name')
561
        self.assertEqual(len(objects), 1)
562
        self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
563

    
564
    def test_list_meta_double_matching(self):
565
        # update object meta
566
        cname = 'apples'
567
        container_url = join_urls(self.pithos_path, self.user, cname)
568
        oname = self.objects[cname].keys().pop()
569
        meta = {'quality': 'aaa', 'stock': 'true'}
570
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
571
                       for k, v in meta.iteritems())
572
        object_url = join_urls(container_url, oname)
573
        self.post(object_url, content_type='', **headers)
574

    
575
        # list objects that satisfy the criteria
576
        r = self.get('%s?meta=Quality,Stock' % container_url)
577
        self.assertEqual(r.status_code, 200)
578
        objects = r.content.split('\n')
579
        if '' in objects:
580
            objects.remove('')
581
        self.assertEqual(objects, [oname])
582

    
583
    def test_list_using_meta(self):
584
        # update object meta
585
        cname = 'apples'
586
        container_url = join_urls(self.pithos_path, self.user, cname)
587

    
588
        onames = self.objects[cname].keys()
589
        url = join_urls(container_url, onames[0])
590
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='aaa')
591
        self.assertEqual(r.status_code, 202)
592

    
593
        url = join_urls(container_url, onames[1])
594
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='ab')
595
        self.assertEqual(r.status_code, 202)
596

    
597
        url = join_urls(container_url, onames[2])
598
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='100')
599
        self.assertEqual(r.status_code, 202)
600

    
601
        url = join_urls(container_url, onames[3])
602
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='200')
603
        self.assertEqual(r.status_code, 202)
604

    
605
        # test multiple existence criteria matches
606
        r = self.get('%s?meta=Quality,Stock' % container_url)
607
        self.assertEqual(r.status_code, 200)
608
        objects = r.content.split('\n')
609
        if '' in objects:
610
            objects.remove('')
611
        self.assertTrue(objects, sorted(onames))
612

    
613
        # list objects that satisfy the existence criteria
614
        r = self.get('%s?meta=Stock' % container_url)
615
        self.assertEqual(r.status_code, 200)
616
        objects = r.content.split('\n')
617
        if '' in objects:
618
            objects.remove('')
619
        self.assertTrue(objects, sorted(onames[2:]))
620

    
621
        # test case insensitive existence criteria matching
622
        r = self.get('%s?meta=quality' % container_url)
623
        self.assertEqual(r.status_code, 200)
624
        objects = r.content.split('\n')
625
        if '' in objects:
626
            objects.remove('')
627
        self.assertTrue(objects, sorted(onames[:2]))
628

    
629
        # test do not all existencecriteria match
630
        r = self.get('%s?meta=Quality,Foo' % container_url)
631
        self.assertEqual(r.status_code, 200)
632
        objects = r.content.split('\n')
633
        if '' in objects:
634
            objects.remove('')
635
        self.assertTrue(objects, sorted(onames[:2]))
636

    
637
        # test equals criteria
638
        r = self.get('%s?meta=%s' % (container_url, quote('Quality=aaa')))
639
        self.assertEqual(r.status_code, 200)
640
        objects = r.content.split('\n')
641
        if '' in objects:
642
            objects.remove('')
643
        self.assertTrue(objects, [onames[0]])
644

    
645
        # test not equals criteria
646
        r = self.get('%s?meta=%s' % (container_url, quote('Quality!=aaa')))
647
        self.assertEqual(r.status_code, 200)
648
        objects = r.content.split('\n')
649
        if '' in objects:
650
            objects.remove('')
651
        self.assertTrue(objects, [onames[1]])
652

    
653
        # test lte criteria
654
        r = self.get('%s?meta=%s' % (container_url, quote('Stock<=120')))
655
        self.assertEqual(r.status_code, 200)
656
        objects = r.content.split('\n')
657
        if '' in objects:
658
            objects.remove('')
659
        self.assertTrue(objects, [onames[2]])
660

    
661
        # test gte criteria
662
        r = self.get('%s?meta=%s' % (container_url, quote('Stock>=200')))
663
        self.assertEqual(r.status_code, 200)
664
        objects = r.content.split('\n')
665
        if '' in objects:
666
            objects.remove('')
667
        self.assertTrue(objects, [onames[3]])
668

    
669
    def test_if_modified_since(self):
670
        cname = 'apples'
671
        container_info = self.get_container_info(cname)
672
        last_modified = container_info['Last-Modified']
673
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
674
        t1_formats = map(t1.strftime, DATE_FORMATS)
675

    
676
        # Check not modified
677
        url = join_urls(self.pithos_path, self.user, cname)
678
        for t in t1_formats:
679
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
680
            self.assertEqual(r.status_code, 304)
681

    
682
        # modify account: add container
683
        _time.sleep(1)
684
        oname = self.upload_object(cname)[0]
685

    
686
        # Check modified
687
        objects = self.objects[cname].keys()
688
        objects.append(oname)
689
        for t in t1_formats:
690
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
691
            self.assertEqual(r.status_code, 200)
692
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
693

    
694
        container_info = self.get_container_info(cname)
695
        last_modified = container_info['Last-Modified']
696
        t2 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
697
        t2_formats = map(t2.strftime, DATE_FORMATS)
698

    
699
        # modify account: update account meta
700
        _time.sleep(1)
701
        self.update_container_meta(cname, {'foo': 'bar'})
702

    
703
        # Check modified
704
        for t in t2_formats:
705
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
706
            self.assertEqual(r.status_code, 200)
707
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
708

    
709
    def test_if_modified_since_invalid_date(self):
710
        cname = 'apples'
711
        url = join_urls(self.pithos_path, self.user, cname)
712
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
713
        self.assertEqual(r.status_code, 200)
714
        self.assertEqual(r.content.split('\n')[:-1],
715
                         sorted(self.objects['apples'].keys()))
716

    
717
    def test_if_not_modified_since(self):
718
        cname = 'apples'
719
        url = join_urls(self.pithos_path, self.user, cname)
720
        container_info = self.get_container_info(cname)
721
        last_modified = container_info['Last-Modified']
722
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
723

    
724
        # Check unmodified
725
        t1 = t + datetime.timedelta(seconds=1)
726
        t1_formats = map(t1.strftime, DATE_FORMATS)
727
        for t in t1_formats:
728
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
729
            self.assertEqual(r.status_code, 200)
730
            self.assertEqual(
731
                r.content.split('\n')[:-1],
732
                sorted(self.objects['apples']))
733

    
734
        # modify account: add container
735
        _time.sleep(2)
736
        self.upload_object(cname)
737

    
738
        container_info = self.get_container_info(cname)
739
        last_modified = container_info['Last-Modified']
740
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
741
        t2 = t - datetime.timedelta(seconds=1)
742
        t2_formats = map(t2.strftime, DATE_FORMATS)
743

    
744
        # Check modified
745
        for t in t2_formats:
746
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
747
            self.assertEqual(r.status_code, 412)
748

    
749
        # modify account: update account meta
750
        _time.sleep(1)
751
        self.update_container_meta(cname, {'foo': 'bar'})
752

    
753
        container_info = self.get_container_info(cname)
754
        last_modified = container_info['Last-Modified']
755
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
756
        t3 = t - datetime.timedelta(seconds=1)
757
        t3_formats = map(t3.strftime, DATE_FORMATS)
758

    
759
        # Check modified
760
        for t in t3_formats:
761
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
762
            self.assertEqual(r.status_code, 412)
763

    
764
    def test_if_unmodified_since(self):
765
        cname = 'apples'
766
        url = join_urls(self.pithos_path, self.user, cname)
767
        container_info = self.get_container_info(cname)
768
        last_modified = container_info['Last-Modified']
769
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
770
        t = t + datetime.timedelta(seconds=1)
771
        t_formats = map(t.strftime, DATE_FORMATS)
772

    
773
        for tf in t_formats:
774
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
775
            self.assertEqual(r.status_code, 200)
776
            self.assertEqual(
777
                r.content.split('\n')[:-1],
778
                sorted(self.objects['apples']))
779

    
780
    def test_if_unmodified_since_precondition_failed(self):
781
        cname = 'apples'
782
        url = join_urls(self.pithos_path, self.user, cname)
783
        container_info = self.get_container_info(cname)
784
        last_modified = container_info['Last-Modified']
785
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
786
        t = t - datetime.timedelta(seconds=1)
787
        t_formats = map(t.strftime, DATE_FORMATS)
788

    
789
        for tf in t_formats:
790
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
791
            self.assertEqual(r.status_code, 412)
792

    
793

    
794
class ContainerPut(PithosAPITest):
795
    def test_create(self):
796
        self.create_container('c1')
797
        self.list_containers()
798
        self.assertTrue('c1' in self.list_containers(format=None))
799

    
800
    def test_create_twice(self):
801
        self.create_container('c1')
802
        self.assertTrue('c1' in self.list_containers(format=None))
803
        r = self.create_container('c1')[-1]
804
        self.assertEqual(r.status_code, 202)
805
        self.assertTrue('c1' in self.list_containers(format=None))
806

    
807

    
808
class ContainerPost(PithosAPITest):
809
    def test_update_meta(self):
810
        cname = 'apples'
811
        self.create_container(cname)
812
        meta = {'test': 'test33', 'tost': 'tost22'}
813
        self.update_container_meta(cname, meta)
814
        info = self.get_container_info(cname)
815
        for k, v in meta.items():
816
            k = 'x-container-meta-%s' % k
817
            self.assertTrue(k in info)
818
            self.assertEqual(info[k], v)
819

    
820
    def test_quota(self):
821
        self.create_container('c1')
822

    
823
        url = join_urls(self.pithos_path, self.user, 'c1')
824
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
825
        self.assertEqual(r.status_code, 202)
826

    
827
        info = self.get_container_info('c1')
828
        self.assertTrue('x-container-policy-quota' in info)
829
        self.assertEqual(info['x-container-policy-quota'], '100')
830

    
831
        r = self.upload_object('c1', length=101, verify=False)[2]
832
        self.assertEqual(r.status_code, 413)
833

    
834
        url = join_urls(self.pithos_path, self.user, 'c1')
835
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='0')
836
        self.assertEqual(r.status_code, 202)
837

    
838
        r = self.upload_object('c1', length=1)
839

    
840

    
841
class ContainerDelete(PithosAPITest):
842
    def setUp(self):
843
        PithosAPITest.setUp(self)
844
        cnames = ['c1', 'c2']
845

    
846
        for c in cnames:
847
            self.create_container(c)
848

    
849
    def test_delete(self):
850
        url = join_urls(self.pithos_path, self.user, 'c1')
851
        r = self.delete(url)
852
        self.assertEqual(r.status_code, 204)
853
        self.assertTrue('c1' not in self.list_containers(format=None))
854

    
855
    def test_delete_non_empty(self):
856
        self.upload_object('c1')
857
        url = join_urls(self.pithos_path, self.user, 'c1')
858
        r = self.delete(url)
859
        self.assertEqual(r.status_code, 409)
860
        self.assertTrue('c1' in self.list_containers(format=None))
861

    
862
    def test_delete_invalid(self):
863
        url = join_urls(self.pithos_path, self.user, 'c3')
864
        r = self.delete(url)
865
        self.assertEqual(r.status_code, 404)
866

    
867
    def test_delete_contents(self):
868
        folder = self.create_folder('c1')[0]
869
        descendant = strnextling(folder)
870
        self.upload_object('c1', descendant)
871
        self.create_folder('c1', '%s/%s' % (folder, get_random_data(5)))[0]
872

    
873
        self.delete('%s?delimiter=/' % join_urls(
874
            self.pithos_path, self.user, 'c1'))
875
        self.assertEqual([], self.list_objects('c1'))
876
        self.assertTrue('c1' in self.list_containers(format=None))