Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / containers.py @ 84c67d8e

History | View | Annotate | Download (34 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from pithos.api.test import (PithosAPITest, DATE_FORMATS, o_names,
38
                             pithos_settings, pithos_test_settings)
39
from pithos.api.test.util import strnextling, get_random_data, get_random_name
40

    
41
from synnefo.lib import join_urls
42

    
43
import django.utils.simplejson as json
44
from django.http import urlencode
45

    
46
from xml.dom import minidom
47
from urllib import quote
48
import time as _time
49

    
50
import random
51
import datetime
52

    
53

    
54
class ContainerHead(PithosAPITest):
55
    def test_get_meta(self):
56
        self.create_container('apples')
57

    
58
        # populate with objects
59
        objects = {}
60
        for i in range(random.randint(1, 100)):
61

    
62
            # upload object
63
            meta = {'foo%s' % i: 'bar'}
64
            name, data, resp = self.upload_object('apples', **meta)
65
            objects[name] = data
66

    
67
        t1 = datetime.datetime.utcnow()
68
        url = join_urls(self.pithos_path, self.user, 'apples')
69
        r = self.head(url)
70
        self.assertEqual(int(r['X-Container-Object-Count']), len(objects))
71
        self.assertEqual(int(r['X-Container-Bytes-Used']),
72
                         sum([len(i) for i in objects.values()]))
73
        self.assertTrue('X-Container-Block-Size' in r)
74
        self.assertTrue('X-Container-Block-Hash' in r)
75
        self.assertTrue('X-Container-Until-Timestamp' not in r)
76
        self.assertEqual(r['X-Container-Policy-Versioning'], 'auto')
77
        self.assertEqual(int(r['X-Container-Policy-Quota']), 0)
78
        t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
79
        delta = (t2 - t1)
80
        threashold = datetime.timedelta(seconds=1)
81
        self.assertTrue(delta < threashold)
82
        self.assertTrue(r['X-Container-Object-Meta'])
83
        (self.assertTrue('foo%s' % i in r['X-Container-Object-Meta'])
84
            for i in range(len(objects)))
85

    
86
    def test_get_container_meta_until(self):
87
        self.create_container('apples')
88

    
89
        # populate with objects
90
        objects = {}
91
        metalist = []
92
        for i in range(random.randint(1, 100)):
93
            # upload object
94
            metakey = 'Foo%s' % i
95
            meta = {metakey: 'bar'}
96
            name, data, resp = self.upload_object('apples', **meta)
97
            objects[name] = data
98
            metalist.append(metakey) 
99

    
100
        self.update_container_meta('apples', {'foo': 'bar'})
101

    
102
        container_info = self.get_container_info('apples')
103
        t = datetime.datetime.strptime(container_info['Last-Modified'],
104
                                       DATE_FORMATS[2])
105
        t1 = t + datetime.timedelta(seconds=1)
106
        until = int(_time.mktime(t1.timetuple()))
107

    
108
        _time.sleep(2)
109

    
110
        for i in range(random.randint(1, 100)):
111
            # upload object
112
            meta = {'foo%s' % i: 'bar'}
113
            self.upload_object('apples', **meta)
114

    
115
        self.update_container_meta('apples', {'quality': 'AAA'})
116

    
117
        container_info = self.get_container_info('apples')
118
        self.assertTrue('X-Container-Meta-Quality' in container_info)
119
        self.assertTrue('X-Container-Meta-Foo' in container_info)
120
        self.assertTrue('X-Container-Object-Count' in container_info)
121
        self.assertTrue(int(container_info['X-Container-Object-Count']) >
122
                        len(objects))
123
        self.assertTrue('X-Container-Bytes-Used' in container_info)
124

    
125
        t = datetime.datetime.strptime(container_info['Last-Modified'],
126
                                       DATE_FORMATS[-1])
127
        last_modified = int(_time.mktime(t.timetuple()))
128
        assert until < last_modified
129

    
130
        container_info = self.get_container_info('apples', until=until)
131
        self.assertTrue('X-Container-Meta-Quality' not in container_info)
132
        self.assertTrue('X-Container-Meta-Foo' in container_info)
133
        self.assertTrue('X-Container-Until-Timestamp' in container_info)
134
        t = datetime.datetime.strptime(
135
            container_info['X-Container-Until-Timestamp'], DATE_FORMATS[2])
136
        self.assertTrue(int(_time.mktime(t1.timetuple())) <= until)
137
        self.assertTrue('X-Container-Object-Count' in container_info)
138
        self.assertEqual(int(container_info['X-Container-Object-Count']),
139
                         len(objects))
140
        self.assertTrue('X-Container-Bytes-Used' in container_info)
141
        self.assertEqual(int(container_info['X-Container-Bytes-Used']),
142
                         sum([len(data) for data in objects.values()]))
143
        self.assertTrue('X-Container-Object-Meta' in container_info)
144
        self.assertEqual(
145
            sorted(container_info['X-Container-Object-Meta'].split(',')),
146
            sorted(metalist))
147

    
148

    
149
class ContainerGet(PithosAPITest):
150
    def setUp(self):
151
        PithosAPITest.setUp(self)
152

    
153
        self.cnames = ['pears', 'apples']
154
        self.objects = {}
155
        for c in self.cnames:
156
            self.create_container(c)
157

    
158
        self.objects['pears'] = {}
159
        for o in o_names[:8]:
160
            name, data, resp = self.upload_object('pears', o)
161
            self.objects['pears'][name] = data
162
        self.objects['apples'] = {}
163
        for o in o_names[8:]:
164
            name, data, resp = self.upload_object('apples', o)
165
            self.objects['apples'][name] = data
166

    
167
    def test_list_until(self):
168
        account_info = self.get_account_info()
169
        t = datetime.datetime.strptime(account_info['Last-Modified'],
170
                                       DATE_FORMATS[2])
171
        t1 = t + datetime.timedelta(seconds=1)
172
        until = int(_time.mktime(t1.timetuple()))
173

    
174
        _time.sleep(2)
175

    
176
        cname = self.cnames[0]
177
        self.upload_object(cname)
178

    
179
        url = join_urls(self.pithos_path, self.user, cname)
180
        r = self.get('%s?until=%s' % (url, until))
181
        self.assertTrue(r.status_code, 200)
182
        objects = r.content.split('\n')
183
        if '' in objects:
184
            objects.remove('')
185
        self.assertEqual(objects,
186
                         sorted(self.objects[cname].keys()))
187

    
188
        r = self.get('%s?until=%s&format=json' % (url, until))
189
        self.assertTrue(r.status_code, 200)
190
        try:
191
            objects = json.loads(r.content)
192
        except:
193
            self.fail('json format expected')
194
        self.assertEqual([o['name'] for o in objects],
195
                         sorted(self.objects[cname].keys()))
196

    
197
    def test_list_shared(self):
198
        # share an object
199
        cname = self.cnames[0]
200
        onames = self.objects[cname].keys()
201
        oname = onames.pop()
202
        url = join_urls(self.pithos_path, self.user, cname, oname)
203
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=*')
204
        self.assertEqual(r.status_code, 202)
205

    
206
        # publish another object
207
        other = onames.pop()
208
        url = join_urls(self.pithos_path, self.user, cname, other)
209
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
210
        self.assertEqual(r.status_code, 202)
211

    
212
        # list shared and assert only the shared object is returned
213
        url = join_urls(self.pithos_path, self.user, cname)
214
        r = self.get('%s?shared=' % url)
215
        self.assertEqual(r.status_code, 200)
216
        objects = r.content.split('\n')
217
        if '' in objects:
218
            objects.remove('')
219
        self.assertEqual([oname], objects)
220

    
221
        # list detailed shared and assert only the shared object is returned
222
        url = join_urls(self.pithos_path, self.user, cname)
223
        r = self.get('%s?shared=&format=json' % url)
224
        self.assertEqual(r.status_code, 200)
225
        try:
226
            objects = json.loads(r.content)
227
        except:
228
            self.fail('json format expected')
229
        self.assertEqual([oname], [o['name'] for o in objects])
230
        self.assertTrue('x_object_sharing' in objects[0])
231
        self.assertTrue('x_object_public' not in objects[0])
232

    
233
        # publish the shared object and assert it is still listed in the
234
        # shared objects
235
        url = join_urls(self.pithos_path, self.user, cname, oname)
236
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
237
        self.assertEqual(r.status_code, 202)
238
        url = join_urls(self.pithos_path, self.user, cname)
239
        r = self.get('%s?shared=&format=json' % url)
240
        self.assertEqual(r.status_code, 200)
241
        try:
242
            objects = json.loads(r.content)
243
        except:
244
            self.fail('json format expected')
245
        self.assertEqual([oname], [o['name'] for o in objects])
246
        self.assertTrue('x_object_sharing' in objects[0])
247
        self.assertTrue('x_object_public' in objects[0])
248

    
249
        # create child object
250
        descendant = strnextling(oname)
251
        self.upload_object(cname, descendant)
252
        # request shared and assert child obejct is not listed
253
        url = join_urls(self.pithos_path, self.user, cname)
254
        r = self.get('%s?shared=' % url)
255
        self.assertEqual(r.status_code, 200)
256
        objects = r.content.split('\n')
257
        if '' in objects:
258
            objects.remove('')
259
        self.assertTrue(oname in objects)
260
        self.assertTrue(descendant not in objects)
261

    
262
        # check folder inheritance
263
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
264
        # create child object
265
        descendant = '%s/%s' % (oname, get_random_name())
266
        self.upload_object(cname, descendant)
267
        # request shared
268
        url = join_urls(self.pithos_path, self.user, cname)
269
        r = self.get('%s?shared=' % url)
270
        self.assertEqual(r.status_code, 200)
271
        objects = r.content.split('\n')
272
        if '' in objects:
273
            objects.remove('')
274
        self.assertTrue(oname in objects)
275
        self.assertTrue(descendant in objects)
276

    
277
    def test_list_public(self):
278
        # publish an object
279
        cname = self.cnames[0]
280
        onames = self.objects[cname].keys()
281
        oname = onames.pop()
282
        other = onames.pop()
283

    
284
        # publish an object
285
        url = join_urls(self.pithos_path, self.user, cname, oname)
286
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
287
        self.assertEqual(r.status_code, 202)
288

    
289
        # share another
290
        url = join_urls(self.pithos_path, self.user, cname, other)
291
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
292
        self.assertEqual(r.status_code, 202)
293

    
294
        # list public and assert only the public object is returned
295
        url = join_urls(self.pithos_path, self.user, cname)
296
        r = self.get('%s?public=' % url)
297
        objects = r.content.split('\n')
298
        self.assertEqual(r.status_code, 200)
299
        self.assertTrue(oname in r.content.split('\n'))
300
        (self.assertTrue(object not in objects) for object in o_names[1:])
301

    
302
        # list detailed public and assert only the public object is returned
303
        url = join_urls(self.pithos_path, self.user, cname)
304
        r = self.get('%s?public=&format=json' % url)
305
        self.assertEqual(r.status_code, 200)
306
        try:
307
            objects = json.loads(r.content)
308
        except:
309
            self.fail('json format expected')
310
        self.assertEqual([oname], [obj['name'] for obj in objects])
311
        self.assertTrue('x_object_sharing' not in objects[0])
312
        self.assertTrue('x_object_public' in objects[0])
313

    
314
        # share the public object and assert it is still listed in the
315
        # public objects
316
        url = join_urls(self.pithos_path, self.user, cname, oname)
317
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
318
        self.assertEqual(r.status_code, 202)
319
        url = join_urls(self.pithos_path, self.user, cname)
320
        r = self.get('%s?public=&format=json' % url)
321
        self.assertEqual(r.status_code, 200)
322
        try:
323
            objects = json.loads(r.content)
324
        except:
325
            self.fail('json format expected')
326
        self.assertEqual([oname], [obj['name'] for obj in objects])
327
        self.assertTrue('x_object_sharing' in objects[0])
328
        self.assertTrue('x_object_public' in objects[0])
329

    
330
        url = join_urls(self.pithos_path, self.user, cname)
331

    
332
        # Assert listing the container public contents is forbidden to not
333
        # shared users
334
        r = self.get('%s?public=&format=json' % url, user='bob')
335
        self.assertEqual(r.status_code, 403)
336

    
337
        # Assert forbidden public object listing to shared users
338
        r = self.get('%s?public=&format=json' % url, user='alice')
339
        self.assertEqual(r.status_code, 403)
340

    
341
        # create child object
342
        descendant = strnextling(oname)
343
        self.upload_object(cname, descendant)
344
        # request public and assert child obejct is not listed
345
        r = self.get('%s?public=' % url)
346
        objects = r.content.split('\n')
347
        if '' in objects:
348
            objects.remove('')
349
        self.assertEqual(r.status_code, 200)
350
        self.assertTrue(oname in objects)
351
        (self.assertTrue(o not in objects) for o in o_names[1:])
352

    
353
        # test folder inheritance
354
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
355
        # create child object
356
        descendant = '%s/%s' % (oname, get_random_name())
357
        self.upload_object(cname, descendant)
358
        # request public
359
        r = self.get('%s?public=' % url)
360
        self.assertEqual(r.status_code, 200)
361
        objects = r.content.split('\n')
362
        self.assertTrue(oname in objects)
363
        self.assertTrue(descendant not in objects)
364

    
365
    def test_list_shared_public(self):
366
        # publish an object
367
        cname = self.cnames[0]
368
        container_url = join_urls(self.pithos_path, self.user, cname)
369
        onames = self.objects[cname].keys()
370
        oname = onames.pop()
371
        url = join_urls(container_url, oname)
372
        r = self.post(url, content_type='', HTTP_X_OBJECT_PUBLIC='true')
373
        self.assertEqual(r.status_code, 202)
374

    
375
        # share another
376
        other = onames.pop()
377
        url = join_urls(container_url, other)
378
        r = self.post(url, content_type='', HTTP_X_OBJECT_SHARING='read=alice')
379
        self.assertEqual(r.status_code, 202)
380

    
381
        # list shared and public objects and assert object is listed
382
        r = self.get('%s?shared=&public=&format=json' % container_url)
383
        self.assertEqual(r.status_code, 200)
384
        objects = json.loads(r.content)
385
        self.assertEqual([o['name'] for o in objects], sorted([oname, other]))
386
        for o in objects:
387
            if o['name'] == oname:
388
                self.assertTrue('x_object_public' in o.keys())
389
            elif o['name'] == other:
390
                self.assertTrue('x_object_sharing' in o.keys())
391

    
392
        # assert not listing shared and public to a not shared user
393
        r = self.get('%s?shared=&public=&format=json' % container_url,
394
                     user='bob')
395
        self.assertEqual(r.status_code, 403)
396

    
397
        # assert not listing public to a shared user
398
        r = self.get('%s?shared=&public=&format=json' % container_url,
399
                     user='alice')
400
        self.assertEqual(r.status_code, 403)
401

    
402
        # create child object
403
        descendant = strnextling(oname)
404
        self.upload_object(cname, descendant)
405
        # request public and assert child obejct is not listed
406
        r = self.get('%s?shared=&public=' % container_url)
407
        objects = r.content.split('\n')
408
        if '' in objects:
409
            objects.remove('')
410
        self.assertEqual(r.status_code, 200)
411
        self.assertTrue(oname in objects)
412
        (self.assertTrue(o not in objects) for o in o_names[1:])
413

    
414
        # test folder inheritance
415
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
416
        # create child object
417
        descendant = '%s/%s' % (oname, get_random_name())
418
        self.upload_object(cname, descendant)
419
        # request public
420
        r = self.get('%s?shared=&public=' % container_url)
421
        self.assertEqual(r.status_code, 200)
422
        objects = r.content.split('\n')
423
        if '' in objects:
424
            objects.remove('')
425
        self.assertTrue(oname in objects)
426
        self.assertTrue(descendant not in objects)
427

    
428
    def test_list_objects(self):
429
        cname = self.cnames[0]
430
        url = join_urls(self.pithos_path, self.user, cname)
431
        r = self.get(url)
432
        self.assertTrue(r.status_code, 200)
433
        objects = r.content.split('\n')
434
        if '' in objects:
435
            objects.remove('')
436
        self.assertEqual(objects, sorted(self.objects[cname].keys()))
437

    
438
    def test_list_objects_containing_slash(self):
439
        self.create_container('test')
440
        self.upload_object('test', quote('/objectname', ''))
441

    
442
        url = join_urls(self.pithos_path, self.user, 'test')
443

    
444
        r = self.get(url)
445
        objects = r.content.split('\n')
446
        if '' in objects:
447
            objects.remove('')
448
        self.assertEqual(objects, ['/objectname'])
449

    
450
        r = self.get('%s?format=json' % url)
451
        try:
452
            objects = json.loads(r.content)
453
        except:
454
            self.fail('json format expected')
455
        self.assertEqual([o['name'] for o in objects], ['/objectname'])
456

    
457
        r = self.get('%s?format=xml' % url)
458
        try:
459
            objects = minidom.parseString(r.content)
460
        except:
461
            self.fail('xml format expected')
462
        self.assertEqual(
463
            [n.firstChild.data for n in objects.getElementsByTagName('name')],
464
            ['/objectname'])
465

    
466
    def test_list_objects_with_limit_marker(self):
467
        cname = self.cnames[0]
468
        url = join_urls(self.pithos_path, self.user, cname)
469
        r = self.get('%s?limit=qwert' % url)
470
        self.assertTrue(r.status_code != 500)
471

    
472
        r = self.get('%s?limit=2' % url)
473
        self.assertEqual(r.status_code, 200)
474
        objects = r.content.split('\n')
475
        if '' in objects:
476
            objects.remove('')
477

    
478
        onames = sorted(self.objects[cname].keys())
479
        self.assertEqual(objects, onames[:2])
480

    
481
        markers = ['How To Win Friends And Influence People.pdf',
482
                   'moms_birthday.jpg']
483
        limit = 4
484
        for m in markers:
485
            r = self.get('%s?limit=%s&marker=%s' % (url, limit, m))
486
            objects = r.content.split('\n')
487
            if '' in objects:
488
                objects.remove('')
489
            start = onames.index(m) + 1
490
            end = start + limit
491
            end = end if len(onames) >= end else len(onames)
492
            self.assertEqual(objects, onames[start:end])
493

    
494
    @pithos_test_settings(API_LIST_LIMIT=10)
495
    def test_list_limit_exceeds(self):
496
        self.create_container('container')
497
        url = join_urls(self.pithos_path, self.user, 'container')
498

    
499
        for _ in range(pithos_settings.API_LIST_LIMIT + 1):
500
            self.upload_object('container')
501

    
502
        r = self.get('%s?format=json' % url)
503
        try:
504
            objects = json.loads(r.content)
505
        except:
506
            self.fail('json format expected')
507
        self.assertEqual(pithos_settings.API_LIST_LIMIT,
508
                         len(objects))
509

    
510
    def test_list_pseudo_hierarchical_folders(self):
511
        url = join_urls(self.pithos_path, self.user, 'apples')
512
        r = self.get('%s?prefix=photos&delimiter=/' % url)
513
        self.assertEqual(r.status_code, 200)
514
        objects = r.content.split('\n')
515
        if '' in objects:
516
            objects.remove('')
517
        self.assertEquals(
518
            ['photos/animals/', 'photos/me.jpg', 'photos/plants/'],
519
            objects)
520

    
521
        r = self.get('%s?prefix=photos/animals&delimiter=/' % url)
522
        objects = r.content.split('\n')
523
        if '' in objects:
524
            objects.remove('')
525
        self.assertEquals(
526
            ['photos/animals/cats/', 'photos/animals/dogs/'], objects)
527

    
528
        r = self.get('%s?path=photos' % url)
529
        objects = r.content.split('\n')
530
        if '' in objects:
531
            objects.remove('')
532
        self.assertEquals(['photos/me.jpg'], objects)
533

    
534
    def test_extended_list_json(self):
535
        url = join_urls(self.pithos_path, self.user, 'apples')
536
        params = {'format': 'json', 'limit': 2, 'prefix': 'photos/animals',
537
                  'delimiter': '/'}
538
        r = self.get('%s?%s' % (url, urlencode(params)))
539
        self.assertEqual(r.status_code, 200)
540
        try:
541
            objects = json.loads(r.content)
542
        except:
543
            self.fail('json format expected')
544
        self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
545
        self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
546

    
547
    def test_extended_list_xml(self):
548
        url = join_urls(self.pithos_path, self.user, 'apples')
549
        params = {'format': 'xml', 'limit': 4, 'prefix': 'photos',
550
                  'delimiter': '/'}
551
        r = self.get('%s?%s' % (url, urlencode(params)))
552
        self.assertEqual(r.status_code, 200)
553
        try:
554
            xml = minidom.parseString(r.content)
555
        except:
556
            self.fail('xml format expected')
557
        self.assert_extended(xml, 'xml', 'object', size=4)
558
        dirs = xml.getElementsByTagName('subdir')
559
        self.assertEqual(len(dirs), 2)
560
        self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
561
        self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
562

    
563
        objects = xml.getElementsByTagName('name')
564
        self.assertEqual(len(objects), 1)
565
        self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
566

    
567
    def test_list_meta_double_matching(self):
568
        # update object meta
569
        cname = 'apples'
570
        container_url = join_urls(self.pithos_path, self.user, cname)
571
        oname = self.objects[cname].keys().pop()
572
        meta = {'quality': 'aaa', 'stock': 'true'}
573
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
574
                       for k, v in meta.iteritems())
575
        object_url = join_urls(container_url, oname)
576
        self.post(object_url, content_type='', **headers)
577

    
578
        # list objects that satisfy the criteria
579
        r = self.get('%s?meta=Quality,Stock' % container_url)
580
        self.assertEqual(r.status_code, 200)
581
        objects = r.content.split('\n')
582
        if '' in objects:
583
            objects.remove('')
584
        self.assertEqual(objects, [oname])
585

    
586
    def test_list_using_meta(self):
587
        # update object meta
588
        cname = 'apples'
589
        container_url = join_urls(self.pithos_path, self.user, cname)
590

    
591
        onames = self.objects[cname].keys()
592
        url = join_urls(container_url, onames[0])
593
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='aaa')
594
        self.assertEqual(r.status_code, 202)
595

    
596
        url = join_urls(container_url, onames[1])
597
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_QUALITY='ab')
598
        self.assertEqual(r.status_code, 202)
599

    
600
        url = join_urls(container_url, onames[2])
601
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='100')
602
        self.assertEqual(r.status_code, 202)
603

    
604
        url = join_urls(container_url, onames[3])
605
        r = self.post(url, content_type='', HTTP_X_OBJECT_META_STOCK='200')
606
        self.assertEqual(r.status_code, 202)
607

    
608
        # test multiple existence criteria matches
609
        r = self.get('%s?meta=Quality,Stock' % container_url)
610
        self.assertEqual(r.status_code, 200)
611
        objects = r.content.split('\n')
612
        if '' in objects:
613
            objects.remove('')
614
        self.assertTrue(objects, sorted(onames))
615

    
616
        # list objects that satisfy the existence criteria
617
        r = self.get('%s?meta=Stock' % container_url)
618
        self.assertEqual(r.status_code, 200)
619
        objects = r.content.split('\n')
620
        if '' in objects:
621
            objects.remove('')
622
        self.assertTrue(objects, sorted(onames[2:]))
623

    
624
        # test case insensitive existence criteria matching
625
        r = self.get('%s?meta=quality' % container_url)
626
        self.assertEqual(r.status_code, 200)
627
        objects = r.content.split('\n')
628
        if '' in objects:
629
            objects.remove('')
630
        self.assertTrue(objects, sorted(onames[:2]))
631

    
632
        # test do not all existencecriteria match
633
        r = self.get('%s?meta=Quality,Foo' % container_url)
634
        self.assertEqual(r.status_code, 200)
635
        objects = r.content.split('\n')
636
        if '' in objects:
637
            objects.remove('')
638
        self.assertTrue(objects, sorted(onames[:2]))
639

    
640
        # test equals criteria
641
        r = self.get('%s?meta=%s' % (container_url, quote('Quality=aaa')))
642
        self.assertEqual(r.status_code, 200)
643
        objects = r.content.split('\n')
644
        if '' in objects:
645
            objects.remove('')
646
        self.assertTrue(objects, [onames[0]])
647

    
648
        # test not equals criteria
649
        r = self.get('%s?meta=%s' % (container_url, quote('Quality!=aaa')))
650
        self.assertEqual(r.status_code, 200)
651
        objects = r.content.split('\n')
652
        if '' in objects:
653
            objects.remove('')
654
        self.assertTrue(objects, [onames[1]])
655

    
656
        # test lte criteria
657
        r = self.get('%s?meta=%s' % (container_url, quote('Stock<=120')))
658
        self.assertEqual(r.status_code, 200)
659
        objects = r.content.split('\n')
660
        if '' in objects:
661
            objects.remove('')
662
        self.assertTrue(objects, [onames[2]])
663

    
664
        # test gte criteria
665
        r = self.get('%s?meta=%s' % (container_url, quote('Stock>=200')))
666
        self.assertEqual(r.status_code, 200)
667
        objects = r.content.split('\n')
668
        if '' in objects:
669
            objects.remove('')
670
        self.assertTrue(objects, [onames[3]])
671

    
672
    def test_if_modified_since(self):
673
        cname = 'apples'
674
        container_info = self.get_container_info(cname)
675
        last_modified = container_info['Last-Modified']
676
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
677
        t1_formats = map(t1.strftime, DATE_FORMATS)
678

    
679
        # Check not modified
680
        url = join_urls(self.pithos_path, self.user, cname)
681
        for t in t1_formats:
682
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
683
            self.assertEqual(r.status_code, 304)
684

    
685
        # modify account: add container
686
        _time.sleep(1)
687
        oname = self.upload_object(cname)[0]
688

    
689
        # Check modified
690
        objects = self.objects[cname].keys()
691
        objects.append(oname)
692
        for t in t1_formats:
693
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
694
            self.assertEqual(r.status_code, 200)
695
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
696

    
697
        container_info = self.get_container_info(cname)
698
        last_modified = container_info['Last-Modified']
699
        t2 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
700
        t2_formats = map(t2.strftime, DATE_FORMATS)
701

    
702
        # modify account: update account meta
703
        _time.sleep(1)
704
        self.update_container_meta(cname, {'foo': 'bar'})
705

    
706
        # Check modified
707
        for t in t2_formats:
708
            r = self.get(url, HTTP_IF_MODIFIED_SINCE=t)
709
            self.assertEqual(r.status_code, 200)
710
            self.assertEqual(r.content.split('\n')[:-1], sorted(objects))
711

    
712
    def test_if_modified_since_invalid_date(self):
713
        cname = 'apples'
714
        url = join_urls(self.pithos_path, self.user, cname)
715
        r = self.get(url, HTTP_IF_MODIFIED_SINCE='Monday')
716
        self.assertEqual(r.status_code, 200)
717
        self.assertEqual(r.content.split('\n')[:-1],
718
                         sorted(self.objects['apples'].keys()))
719

    
720
    def test_if_not_modified_since(self):
721
        cname = 'apples'
722
        url = join_urls(self.pithos_path, self.user, cname)
723
        container_info = self.get_container_info(cname)
724
        last_modified = container_info['Last-Modified']
725
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
726

    
727
        # Check unmodified
728
        t1 = t + datetime.timedelta(seconds=1)
729
        t1_formats = map(t1.strftime, DATE_FORMATS)
730
        for t in t1_formats:
731
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
732
            self.assertEqual(r.status_code, 200)
733
            self.assertEqual(
734
                r.content.split('\n')[:-1],
735
                sorted(self.objects['apples']))
736

    
737
        # modify account: add container
738
        _time.sleep(2)
739
        self.upload_object(cname)
740

    
741
        container_info = self.get_container_info(cname)
742
        last_modified = container_info['Last-Modified']
743
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
744
        t2 = t - datetime.timedelta(seconds=1)
745
        t2_formats = map(t2.strftime, DATE_FORMATS)
746

    
747
        # Check modified
748
        for t in t2_formats:
749
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
750
            self.assertEqual(r.status_code, 412)
751

    
752
        # modify account: update account meta
753
        _time.sleep(1)
754
        self.update_container_meta(cname, {'foo': 'bar'})
755

    
756
        container_info = self.get_container_info(cname)
757
        last_modified = container_info['Last-Modified']
758
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
759
        t3 = t - datetime.timedelta(seconds=1)
760
        t3_formats = map(t3.strftime, DATE_FORMATS)
761

    
762
        # Check modified
763
        for t in t3_formats:
764
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=t)
765
            self.assertEqual(r.status_code, 412)
766

    
767
    def test_if_unmodified_since(self):
768
        cname = 'apples'
769
        url = join_urls(self.pithos_path, self.user, cname)
770
        container_info = self.get_container_info(cname)
771
        last_modified = container_info['Last-Modified']
772
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
773
        t = t + datetime.timedelta(seconds=1)
774
        t_formats = map(t.strftime, DATE_FORMATS)
775

    
776
        for tf in t_formats:
777
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
778
            self.assertEqual(r.status_code, 200)
779
            self.assertEqual(
780
                r.content.split('\n')[:-1],
781
                sorted(self.objects['apples']))
782

    
783
    def test_if_unmodified_since_precondition_failed(self):
784
        cname = 'apples'
785
        url = join_urls(self.pithos_path, self.user, cname)
786
        container_info = self.get_container_info(cname)
787
        last_modified = container_info['Last-Modified']
788
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
789
        t = t - datetime.timedelta(seconds=1)
790
        t_formats = map(t.strftime, DATE_FORMATS)
791

    
792
        for tf in t_formats:
793
            r = self.get(url, HTTP_IF_UNMODIFIED_SINCE=tf)
794
            self.assertEqual(r.status_code, 412)
795

    
796

    
797
class ContainerPut(PithosAPITest):
798
    def test_create(self):
799
        self.create_container('c1')
800
        self.list_containers()
801
        self.assertTrue('c1' in self.list_containers(format=None))
802

    
803
    def test_create_twice(self):
804
        self.create_container('c1')
805
        self.assertTrue('c1' in self.list_containers(format=None))
806
        r = self.create_container('c1')[-1]
807
        self.assertEqual(r.status_code, 202)
808
        self.assertTrue('c1' in self.list_containers(format=None))
809

    
810

    
811
class ContainerPost(PithosAPITest):
812
    def test_update_meta(self):
813
        cname = 'apples'
814
        self.create_container(cname)
815
        meta = {'test': 'test33', 'tost': 'tost22'}
816
        self.update_container_meta(cname, meta)
817
        info = self.get_container_info(cname)
818
        for k, v in meta.items():
819
            k = 'x-container-meta-%s' % k
820
            self.assertTrue(k in info)
821
            self.assertEqual(info[k], v)
822

    
823
    def test_quota(self):
824
        self.create_container('c1')
825

    
826
        url = join_urls(self.pithos_path, self.user, 'c1')
827
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='100')
828
        self.assertEqual(r.status_code, 202)
829

    
830
        info = self.get_container_info('c1')
831
        self.assertTrue('x-container-policy-quota' in info)
832
        self.assertEqual(info['x-container-policy-quota'], '100')
833

    
834
        r = self.upload_object('c1', length=101, verify_status=False)[2]
835
        self.assertEqual(r.status_code, 413)
836

    
837
        url = join_urls(self.pithos_path, self.user, 'c1')
838
        r = self.post(url, HTTP_X_CONTAINER_POLICY_QUOTA='0')
839
        self.assertEqual(r.status_code, 202)
840

    
841
        r = self.upload_object('c1', length=1)
842

    
843

    
844
class ContainerDelete(PithosAPITest):
845
    def setUp(self):
846
        PithosAPITest.setUp(self)
847
        cnames = ['c1', 'c2']
848

    
849
        for c in cnames:
850
            self.create_container(c)
851

    
852
    def test_delete(self):
853
        url = join_urls(self.pithos_path, self.user, 'c1')
854
        r = self.delete(url)
855
        self.assertEqual(r.status_code, 204)
856
        self.assertTrue('c1' not in self.list_containers(format=None))
857

    
858
    def test_delete_non_empty(self):
859
        self.upload_object('c1')
860
        url = join_urls(self.pithos_path, self.user, 'c1')
861
        r = self.delete(url)
862
        self.assertEqual(r.status_code, 409)
863
        self.assertTrue('c1' in self.list_containers(format=None))
864

    
865
    def test_delete_invalid(self):
866
        url = join_urls(self.pithos_path, self.user, 'c3')
867
        r = self.delete(url)
868
        self.assertEqual(r.status_code, 404)
869

    
870
    def test_delete_contents(self):
871
        folder = self.create_folder('c1')[0]
872
        descendant = strnextling(folder)
873
        self.upload_object('c1', descendant)
874
        self.create_folder('c1', '%s/%s' % (folder, get_random_data(5)))[0]
875

    
876
        self.delete('%s?delimiter=/' % join_urls(
877
            self.pithos_path, self.user, 'c1'))
878
        self.assertEqual([], self.list_objects('c1'))
879
        self.assertTrue('c1' in self.list_containers(format=None))