Revision 3cdb2b79

b/snf-common/synnefo/settings/test.py
8 8

  
9 9
DATABASES = {
10 10
    'default': {
11
        'ENGINE': 'sqlite3',
12
        'NAME': '/tmp/synnefo_test_db.sqlite',
11
        #'ENGINE': 'django.db.backends.sqlite3',
12
        #'NAME': '/tmp/synnefo_test_db.sqlite',
13

  
14
        'ENGINE': 'django.db.backends.postgresql_psycopg2',
15
        'NAME': 'pithos',
16
        'USER': 'postgres',
17
        'PORT': '5432',
13 18
    }
14 19
}
15 20

  
b/snf-django-lib/snf_django/utils/testing.py
116 116
        return inner
117 117
    return wrapper
118 118

  
119
serial = 0
120

  
119 121

  
120 122
@contextmanager
121 123
def astakos_user(user):
......
131 133
        get_token.return_value = "DummyToken"
132 134
        with patch('astakosclient.AstakosClient.get_user_info') as m:
133 135
            m.return_value = {"uuid": user}
134
            yield
135

  
136
serial = 0
136
            with patch('astakosclient.AstakosClient.get_quotas') as m2:
137
                m2.return_value = {
138
                    "system": {
139
                        "pithos.diskspace": {
140
                            "usage": 0,
141
                            "limit": 1073741824,
142
                            "pending": 0
143
                        }
144
                    }
145
                }
146
                with patch('astakosclient.AstakosClient.issue_one_commission') as m3:
147
                    serials = []
148
                    append = serials.append
149

  
150
                    def get_serial(*args, **kwargs):
151
                        global serial
152
                        serial += 1
153
                        append(serial)
154
                        return serial
155

  
156
                    m3.side_effect = get_serial
157
                    with patch('astakosclient.AstakosClient.resolve_commissions') as m4:
158
                        m4.return_value = {'accepted': serials,
159
                                           'rejected': [],
160
                                           'failed': []}
161
                        with patch('astakosclient.AstakosClient.get_usernames') as m5:
162

  
163
                            def get_usernames(*args, **kwargs):
164
                                uuids = args[-1]
165
                                return dict((uuid, uuid) for uuid in uuids)
166

  
167
                            m5.side_effect = get_usernames
168
                            yield
137 169

  
138 170

  
139 171
@contextmanager
b/snf-pithos-app/pithos/api/test/__init__.py
1
#!/usr/bin/env python
2
#coding=utf8
3

  
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

  
37
from urlparse import urlunsplit, urlsplit
38
from xml.dom import minidom
39

  
40
from snf_django.utils.testing import with_settings, astakos_user
41

  
42
from pithos.backends.random_word import get_random_word
43
from pithos.api import settings as pithos_settings
44

  
45
from django.test import TestCase
46
from django.utils.http import urlencode
47
from django.conf import settings
48

  
49
import django.utils.simplejson as json
50

  
51
import re
52
import random
53
import threading
54
import functools
55

  
56
pithos_test_settings = functools.partial(with_settings, pithos_settings)
57

  
58
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
59
                "%A, %d-%b-%y %H:%M:%S GMT",
60
                "%a, %d %b %Y %H:%M:%S GMT"]
61

  
62
o_names = ['kate.jpg',
63
           'kate_beckinsale.jpg',
64
           'How To Win Friends And Influence People.pdf',
65
           'moms_birthday.jpg',
66
           'poodle_strut.mov',
67
           'Disturbed - Down With The Sickness.mp3',
68
           'army_of_darkness.avi',
69
           'the_mad.avi',
70
           'photos/animals/dogs/poodle.jpg',
71
           'photos/animals/dogs/terrier.jpg',
72
           'photos/animals/cats/persian.jpg',
73
           'photos/animals/cats/siamese.jpg',
74
           'photos/plants/fern.jpg',
75
           'photos/plants/rose.jpg',
76
           'photos/me.jpg']
77

  
78
details = {'container': ('name', 'count', 'bytes', 'last_modified',
79
                         'x_container_policy'),
80
           'object': ('name', 'hash', 'bytes', 'content_type',
81
                      'content_encoding', 'last_modified',)}
82

  
83
return_codes = (400, 401, 403, 404, 503)
84

  
85

  
86
class PithosAPITest(TestCase):
87
    #TODO unauthorized request
88
    def setUp(self):
89
        pithos_settings.BACKEND_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
90
        pithos_settings.BACKEND_DB_CONNECTION = construct_db_connection()
91
        pithos_settings.BACKEND_POOL_SIZE = 1
92
        self.user = 'user'
93

  
94
    def tearDown(self):
95
        #delete additionally created metadata
96
        meta = self.get_account_meta()
97
        self.delete_account_meta(meta)
98

  
99
        #delete additionally created groups
100
        groups = self.get_account_groups()
101
        self.delete_account_groups(groups)
102

  
103
        self._clean_account()
104

  
105
    def head(self, url, user='user', *args, **kwargs):
106
        with astakos_user(user):
107
            response = self.client.head(url, *args, **kwargs)
108
        return response
109

  
110
    def get(self, url, user='user', *args, **kwargs):
111
        with astakos_user(user):
112
            response = self.client.get(url, *args, **kwargs)
113
        return response
114

  
115
    def delete(self, url, user='user', *args, **kwargs):
116
        with astakos_user(user):
117
            response = self.client.delete(url, *args, **kwargs)
118
        return response
119

  
120
    def post(self, url, user='user', *args, **kwargs):
121
        with astakos_user(user):
122
            kwargs.setdefault('content_type', 'application/octet-stream')
123
            response = self.client.post(url, *args, **kwargs)
124
        return response
125

  
126
    def put(self, url, user='user', *args, **kwargs):
127
        with astakos_user(user):
128
            kwargs.setdefault('content_type', 'application/octet-stream')
129
            response = self.client.put(url, *args, **kwargs)
130
        return response
131

  
132
    def _clean_account(self):
133
        for c in self.list_containers():
134
            self.delete_container_content(c['name'])
135
            self.delete_container(c['name'])
136

  
137
    def update_account_meta(self, meta):
138
        kwargs = dict(
139
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
140
        r = self.post('/v1/%s?update=' % self.user, **kwargs)
141
        self.assertEqual(r.status_code, 202)
142
        account_meta = self.get_account_meta()
143
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
144
            k in meta.keys())
145
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
146
            k, v in meta.items())
147

  
148
    def reset_account_meta(self, meta):
149
        kwargs = dict(
150
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
151
        r = self.post('/v1/%s' % self.user, **kwargs)
152
        self.assertEqual(r.status_code, 202)
153
        account_meta = self.get_account_meta()
154
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
155
            k in meta.keys())
156
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
157
            k, v in meta.items())
158

  
159
    def delete_account_meta(self, meta):
160
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
161
        kwargs = dict((transform(k), '') for k, v in meta.items())
162
        r = self.post('/v1/%s?update=' % self.user, **kwargs)
163
        self.assertEqual(r.status_code, 202)
164
        account_meta = self.get_account_meta()
165
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
166
            k in meta.keys())
167
        return r
168

  
169
    def delete_account_groups(self, groups):
170
        r = self.post('/v1/%s?update=' % self.user, **groups)
171
        self.assertEqual(r.status_code, 202)
172
        return r
173

  
174
    def get_account_info(self, until=None):
175
        url = '/v1/%s' % self.user
176
        if until is not None:
177
            parts = list(urlsplit(url))
178
            parts[3] = urlencode({
179
                'until': until
180
            })
181
            url = urlunsplit(parts)
182
        r = self.head(url)
183
        self.assertEqual(r.status_code, 204)
184
        return r
185

  
186
    def get_account_meta(self, until=None):
187
        r = self.get_account_info(until=until)
188
        headers = dict(r._headers.values())
189
        map(headers.pop,
190
            [k for k in headers.keys()
191
                if not k.startswith('X-Account-Meta-')])
192
        return headers
193

  
194
    def get_account_groups(self, until=None):
195
        r = self.get_account_info(until=until)
196
        headers = dict(r._headers.values())
197
        map(headers.pop,
198
            [k for k in headers.keys()
199
                if not k.startswith('X-Account-Group-')])
200
        return headers
201

  
202
    def list_containers(self, format='json', headers={}, **params):
203
        url = '/v1/%s' % self.user
204
        parts = list(urlsplit(url))
205
        params['format'] = format
206
        parts[3] = urlencode(params)
207
        url = urlunsplit(parts)
208
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
209
                        for k, v in headers.items())
210
        r = self.get(url, **_headers)
211

  
212
        if format is None:
213
            containers = r.content.split('\n')
214
            if '' in containers:
215
                containers.remove('')
216
            return containers
217
        elif format == 'json':
218
            try:
219
                containers = json.loads(r.content)
220
            except:
221
                self.fail('json format expected')
222
            return containers
223
        elif format == 'xml':
224
            return minidom.parseString(r.content)
225

  
226
    def delete_container_content(self, cname):
227
        r = self.delete('/v1/%s/%s?delimiter=/' % (self.user, cname))
228
        self.assertEqual(r.status_code, 204)
229
        return r
230

  
231
    def delete_container(self, cname):
232
        r = self.delete('/v1/%s/%s' % (self.user, cname))
233
        self.assertEqual(r.status_code, 204)
234
        return r
235

  
236
    def create_container(self, cname):
237
        r = self.put('/v1/%s/%s' % (self.user, cname), data='')
238
        self.assertTrue(r.status_code in (202, 201))
239
        return r
240

  
241
    def upload_object(self, cname, oname=None, **meta):
242
        oname = oname or get_random_word(8)
243
        data = get_random_word(length=random.randint(1, 1024))
244
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
245
                       for k, v in meta.iteritems())
246
        r = self.put('/v1/%s/%s/%s' % (
247
            self.user, cname, oname), data=data, **headers)
248
        self.assertEqual(r.status_code, 201)
249
        return oname, data, r
250

  
251
    def create_folder(self, cname, oname=get_random_word(8), **headers):
252
        r = self.put('/v1/%s/%s/%s' % (
253
            self.user, cname, oname), data='',
254
            content_type='application/directory',
255
            **headers)
256
        self.assertEqual(r.status_code, 201)
257
        return oname, r
258

  
259
    def list_objects(self, cname):
260
        r = self.get('/v1/%s/%s?format=json' % (self.user, cname))
261
        self.assertTrue(r.status_code in (200, 204))
262
        try:
263
            objects = json.loads(r.content)
264
        except:
265
            self.fail('json format expected')
266
        return objects
267

  
268
    def assert_status(self, status, codes):
269
        l = [elem for elem in return_codes]
270
        if isinstance(codes, list):
271
            l.extend(codes)
272
        else:
273
            l.append(codes)
274
        self.assertTrue(status in l)
275

  
276
    def assert_extended(self, data, format, type, size=10000):
277
        if format == 'xml':
278
            self._assert_xml(data, type, size)
279
        elif format == 'json':
280
            self._assert_json(data, type, size)
281

  
282
    def _assert_json(self, data, type, size):
283
        convert = lambda s: s.lower()
284
        info = [convert(elem) for elem in details[type]]
285
        self.assertTrue(len(data) <= size)
286
        for item in info:
287
            for i in data:
288
                if 'subdir' in i.keys():
289
                    continue
290
                self.assertTrue(item in i.keys())
291

  
292
    def _assert_xml(self, data, type, size):
293
        convert = lambda s: s.lower()
294
        info = [convert(elem) for elem in details[type]]
295
        try:
296
            info.remove('content_encoding')
297
        except ValueError:
298
            pass
299
        xml = data
300
        entities = xml.getElementsByTagName(type)
301
        self.assertTrue(len(entities) <= size)
302
        for e in entities:
303
            for item in info:
304
                self.assertTrue(e.getElementsByTagName(item))
305

  
306

  
307
class AssertMappingInvariant(object):
308
    def __init__(self, callable, *args, **kwargs):
309
        self.callable = callable
310
        self.args = args
311
        self.kwargs = kwargs
312

  
313
    def __enter__(self):
314
        self.map = self.callable(*self.args, **self.kwargs)
315
        return self.map
316

  
317
    def __exit__(self, type, value, tb):
318
        map = self.callable(*self.args, **self.kwargs)
319
        for k, v in self.map.items():
320
            if is_date(v):
321
                continue
322

  
323
            assert(k in map), '%s not in map' % k
324
            assert v == map[k]
325

  
326
django_sqlalchemy_engines = {
327
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
328
    'django.db.backends.postgresql': 'postgresql',
329
    'django.db.backends.mysql': '',
330
    'django.db.backends.sqlite3': 'mssql',
331
    'django.db.backends.oracle': 'oracle'}
332

  
333

  
334
def construct_db_connection():
335
    """Convert the django default database to an sqlalchemy connection
336
       string"""
337
    db = settings.DATABASES['default']
338
    if db['ENGINE'] == 'django.db.backends.sqlite3':
339
        return 'sqlite://'
340
    else:
341
        d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
342
                 user=db['USER'],
343
                 pwd=db['PASSWORD'],
344
                 host=db['HOST'].lower(),
345
                 port=int(db['PORT']) if db['PORT'] != '' else '',
346
                 name=db['NAME'])
347
        return '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d
348

  
349

  
350
def is_date(date):
351
    __D = r'(?P<day>\d{2})'
352
    __D2 = r'(?P<day>[ \d]\d)'
353
    __M = r'(?P<mon>\w{3})'
354
    __Y = r'(?P<year>\d{4})'
355
    __Y2 = r'(?P<year>\d{2})'
356
    __T = r'(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})'
357
    RFC1123_DATE = re.compile(r'^\w{3}, %s %s %s %s GMT$' % (
358
        __D, __M, __Y, __T))
359
    RFC850_DATE = re.compile(r'^\w{6,9}, %s-%s-%s %s GMT$' % (
360
        __D, __M, __Y2, __T))
361
    ASCTIME_DATE = re.compile(r'^\w{3} %s %s %s %s$' % (
362
        __M, __D2, __T, __Y))
363
    for regex in RFC1123_DATE, RFC850_DATE, ASCTIME_DATE:
364
        m = regex.match(date)
365
        if m is not None:
366
            return True
367
    return False
368

  
369

  
370
def strnextling(prefix):
371
    """Return the first unicode string
372
       greater than but not starting with given prefix.
373
       strnextling('hello') -> 'hellp'
374
    """
375
    if not prefix:
376
        ## all strings start with the null string,
377
        ## therefore we have to approximate strnextling('')
378
        ## with the last unicode character supported by python
379
        ## 0x10ffff for wide (32-bit unicode) python builds
380
        ## 0x00ffff for narrow (16-bit unicode) python builds
381
        ## We will not autodetect. 0xffff is safe enough.
382
        return unichr(0xffff)
383
    s = prefix[:-1]
384
    c = ord(prefix[-1])
385
    if c >= 0xffff:
386
        raise RuntimeError
387
    s += unichr(c + 1)
388
    return s
389

  
390

  
391
def test_concurrently(times=2):
392
    """
393
    Add this decorator to small pieces of code that you want to test
394
    concurrently to make sure they don't raise exceptions when run at the
395
    same time.  E.g., some Django views that do a SELECT and then a subsequent
396
    INSERT might fail when the INSERT assumes that the data has not changed
397
    since the SELECT.
398
    """
399
    def test_concurrently_decorator(test_func):
400
        def wrapper(*args, **kwargs):
401
            exceptions = []
402

  
403
            def call_test_func():
404
                try:
405
                    test_func(*args, **kwargs)
406
                except Exception, e:
407
                    exceptions.append(e)
408
                    raise
409

  
410
            threads = []
411
            for i in range(times):
412
                threads.append(threading.Thread())
413
            for t in threads:
414
                t.start()
415
            for t in threads:
416
                t.join()
417
            if exceptions:
418
                raise Exception(
419
                    ('test_concurrently intercepted %s',
420
                     'exceptions: %s') % (len(exceptions), exceptions))
421
        return wrapper
422
    return test_concurrently_decorator
b/snf-pithos-app/pithos/api/test/accounts.py
1
#!/usr/bin/env python
2
#coding=utf8
3

  
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

  
37
from pithos.api.test import PithosAPITest, AssertMappingInvariant,\
38
    DATE_FORMATS
39

  
40
import time as _time
41
import datetime
42

  
43

  
44
class AccountHead(PithosAPITest):
45
    def test_get_account_meta(self):
46
        cnames = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
47

  
48
        # create containers
49
        uploaded_bytes = 0
50
        for cname in cnames:
51
            self.create_container(cname)
52

  
53
            # upload object
54
            name, data, resp = self.upload_object(cname)
55
            uploaded_bytes += len(data)
56

  
57
        # set account meta
58
        self.update_account_meta({'foo': 'bar'})
59

  
60
        account_info = self.get_account_info()
61
        self.assertTrue('X-Account-Meta-Foo' in account_info)
62
        self.assertEqual(account_info['X-Account-Meta-Foo'], 'bar')
63

  
64
        # list containers
65
        containers = self.list_containers()
66
        self.assertEqual(int(account_info['X-Account-Container-Count']),
67
                         len(containers))
68
        usage = 0
69
        for c in containers:
70
            # list objects
71
            objects = self.list_objects(c['name'])
72
            self.assertEqual(c['count'], len(objects))
73
            csum = sum([o['bytes'] for o in objects])
74
            self.assertEqual(int(c['bytes']), csum)
75
            usage += int(c['bytes'])
76

  
77
        self.assertEqual(
78
            int(account_info['x-account-bytes-used']) + uploaded_bytes,
79
            usage)
80

  
81
    def test_get_account_meta_until(self):
82
        self.update_account_meta({'foo': 'bar'})
83

  
84
        account_info = self.get_account_info()
85
        t = datetime.datetime.strptime(account_info['Last-Modified'],
86
                                       DATE_FORMATS[2])
87
        t1 = t + datetime.timedelta(seconds=1)
88
        until = int(_time.mktime(t1.timetuple()))
89

  
90
        _time.sleep(2)
91
        self.update_account_meta({'quality': 'AAA'})
92

  
93
        account_info = self.get_account_info()
94
        t = datetime.datetime.strptime(account_info['Last-Modified'],
95
                                       DATE_FORMATS[-1])
96
        last_modified = int(_time.mktime(t.timetuple()))
97
        assert until < last_modified
98

  
99
        self.assertTrue('X-Account-Meta-Quality' in account_info)
100
        self.assertTrue('X-Account-Meta-Foo' in account_info)
101

  
102
        account_info = self.get_account_info(until=until)
103
        self.assertTrue('X-Account-Meta-Quality' not in account_info)
104
        self.assertTrue('X-Account-Meta-Foo' in account_info)
105
        self.assertTrue('X-Account-Until-Timestamp' in account_info)
106
        t = datetime.datetime.strptime(
107
            account_info['X-Account-Until-Timestamp'], DATE_FORMATS[2])
108
        self.assertTrue(int(_time.mktime(t1.timetuple())) <= until)
109

  
110
    def test_get_account_meta_until_invalid_date(self):
111
        self.update_account_meta({'quality': 'AAA'})
112
        meta = self.get_account_meta(until='-1')
113
        self.assertTrue('X-Account-Meta-Quality' in meta)
114

  
115

  
116
class AccountGet(PithosAPITest):
117
    def setUp(self):
118
        PithosAPITest.setUp(self)
119
        cnames = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
120

  
121
        # create containers
122
        uploaded_bytes = 0
123
        for cname in cnames:
124
            self.create_container(cname)
125

  
126
            # upload object
127
            name, data, resp = self.upload_object(cname)
128
            uploaded_bytes += len(data)
129

  
130
    def test_list(self):
131
        #list containers: row format
132
        containers = self.list_containers(format=None)
133
        self.assertEquals(containers,
134
                          ['apples', 'bananas', 'kiwis', 'oranges', 'pears'])
135

  
136
    def test_list_with_limit(self):
137
        containers = self.list_containers(format=None, limit=2)
138
        self.assertEquals(len(containers), 2)
139
        self.assertEquals(containers, ['apples', 'bananas'])
140

  
141
    def test_list_with_marker(self):
142
        containers = self.list_containers(format=None, limit=2,
143
                                          marker='bananas')
144
        self.assertEquals(containers, ['kiwis', 'oranges'])
145

  
146
        containers = self.list_containers(format=None, limit=2,
147
                                          marker='oranges')
148
        self.assertEquals(containers, ['pears'])
149

  
150
    def test_list_json_with_marker(self):
151
        containers = self.list_containers(format='json', limit=2,
152
                                          marker='bananas')
153
        self.assert_extended(containers, 'json', 'container', 2)
154
        self.assertEqual(containers[0]['name'], 'kiwis')
155
        self.assertEqual(containers[1]['name'], 'oranges')
156

  
157
        containers = self.list_containers(format='json', limit=2,
158
                                          marker='oranges')
159
        self.assert_extended(containers, 'json', 'container', 1)
160
        self.assertEqual(containers[0]['name'], 'pears')
161

  
162
    def test_list_xml_with_marker(self):
163
        xml = self.list_containers(format='xml', limit=2, marker='bananas')
164
        self.assert_extended(xml, 'xml', 'container', 2)
165
        nodes = xml.getElementsByTagName('name')
166
        self.assertTrue(len(nodes) <= 2)
167
        names = [n.childNodes[0].data for n in nodes]
168
        self.assertEqual(names, ['kiwis', 'oranges'])
169

  
170
        xml = self.list_containers(format='xml', limit=2, marker='oranges')
171
        self.assert_extended(xml, 'xml', 'container', 1)
172
        nodes = xml.getElementsByTagName('name')
173
        self.assertTrue(len(nodes) <= 2)
174
        names = [n.childNodes[0].data for n in nodes]
175
        self.assertEqual(names, ['pears'])
176

  
177
    def test_if_modified_since(self):
178
        account_info = self.get_account_info()
179
        last_modified = account_info['Last-Modified']
180
        t1 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
181
        t1_formats = map(t1.strftime, DATE_FORMATS)
182

  
183
        # Check not modified
184
        for t in t1_formats:
185
            r = self.get('/v1/%s' % self.user, HTTP_IF_MODIFIED_SINCE=t)
186
            self.assertEqual(r.status_code, 304)
187

  
188
        # modify account: add container
189
        _time.sleep(1)
190
        self.create_container('c1')
191

  
192
        # Check modified
193
        for t in t1_formats:
194
            r = self.get('/v1/%s' % self.user, HTTP_IF_MODIFIED_SINCE=t)
195
            self.assertEqual(r.status_code, 200)
196
            self.assertEqual(
197
                r.content.split('\n')[:-1],
198
                ['apples', 'bananas', 'c1', 'kiwis', 'oranges', 'pears'])
199

  
200
        account_info = self.get_account_info()
201
        last_modified = account_info['Last-Modified']
202
        t2 = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
203
        t2_formats = map(t2.strftime, DATE_FORMATS)
204

  
205
        # modify account: update account meta
206
        _time.sleep(1)
207
        self.update_account_meta({'foo': 'bar'})
208

  
209
        # Check modified
210
        for t in t2_formats:
211
            r = self.get('/v1/%s' % self.user, HTTP_IF_MODIFIED_SINCE=t)
212
            self.assertEqual(r.status_code, 200)
213
            self.assertEqual(
214
                r.content.split('\n')[:-1],
215
                ['apples', 'bananas', 'c1', 'kiwis', 'oranges', 'pears'])
216

  
217
    def test_if_modified_since_invalid_date(self):
218
        r = self.get('/v1/%s' % self.user, HTTP_IF_MODIFIED_SINCE='Monday')
219
        self.assertEqual(r.status_code, 200)
220
        self.assertEqual(
221
            r.content.split('\n')[:-1],
222
            ['apples', 'bananas', 'kiwis', 'oranges', 'pears'])
223

  
224
    def test_if_not_modified_since(self):
225
        account_info = self.get_account_info()
226
        last_modified = account_info['Last-Modified']
227
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
228

  
229
        # Check unmodified
230
        t1 = t + datetime.timedelta(seconds=1)
231
        t1_formats = map(t1.strftime, DATE_FORMATS)
232
        for t in t1_formats:
233
            r = self.get('/v1/%s' % self.user, HTTP_IF_UNMODIFIED_SINCE=t)
234
            self.assertEqual(r.status_code, 200)
235
            self.assertEqual(
236
                r.content.split('\n')[:-1],
237
                ['apples', 'bananas', 'kiwis', 'oranges', 'pears'])
238

  
239
        # modify account: add container
240
        _time.sleep(2)
241
        self.create_container('c1')
242

  
243
        account_info = self.get_account_info()
244
        last_modified = account_info['Last-Modified']
245
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
246
        t2 = t - datetime.timedelta(seconds=1)
247
        t2_formats = map(t2.strftime, DATE_FORMATS)
248

  
249
        # Check modified
250
        for t in t2_formats:
251
            r = self.get('/v1/%s' % self.user, HTTP_IF_UNMODIFIED_SINCE=t)
252
            self.assertEqual(r.status_code, 412)
253

  
254
        # modify account: update account meta
255
        _time.sleep(1)
256
        self.update_account_meta({'foo': 'bar'})
257

  
258
        account_info = self.get_account_info()
259
        last_modified = account_info['Last-Modified']
260
        t = datetime.datetime.strptime(last_modified, DATE_FORMATS[-1])
261
        t3 = t - datetime.timedelta(seconds=1)
262
        t3_formats = map(t3.strftime, DATE_FORMATS)
263

  
264
        # Check modified
265
        for t in t3_formats:
266
            r = self.get('/v1/%s' % self.user, HTTP_IF_UNMODIFIED_SINCE=t)
267
            self.assertEqual(r.status_code, 412)
268

  
269
    def test_if_unmodified_since_invalid_date(self):
270
        r = self.get('/v1/%s' % self.user, HTTP_IF_UNMODIFIED_SINCE='Monday')
271
        self.assertEqual(r.status_code, 200)
272
        self.assertEqual(
273
            r.content.split('\n')[:-1],
274
            ['apples', 'bananas', 'kiwis', 'oranges', 'pears'])
275

  
276

  
277
class AccountPost(PithosAPITest):
278
    def setUp(self):
279
        PithosAPITest.setUp(self)
280
        cnames = ['apples', 'bananas', 'kiwis', 'oranges', 'pears']
281

  
282
        # create containers
283
        uploaded_bytes = 0
284
        for cname in cnames:
285
            self.create_container(cname)
286

  
287
            # upload object
288
            name, data, resp = self.upload_object(cname)
289
            uploaded_bytes += len(data)
290

  
291
        # set account meta
292
        self.update_account_meta({'foo': 'bar'})
293

  
294
    def test_update_meta(self):
295
        with AssertMappingInvariant(self.get_account_groups):
296
            initial = self.get_account_meta()
297

  
298
            meta = {'test': 'tost', 'ping': 'pong'}
299
            kwargs = dict(('HTTP_X_ACCOUNT_META_%s' % k, str(v))
300
                          for k, v in meta.items())
301
            r = self.post('/v1/%s?update=' % self.user, **kwargs)
302
            self.assertEqual(r.status_code, 202)
303

  
304
            meta.update(initial)
305
            account_meta = self.get_account_meta()
306
            (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
307
                k in meta.keys())
308
            (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
309
                k, v in meta.items())
310

  
311
    def test_reset_meta(self):
312
        with AssertMappingInvariant(self.get_account_groups):
313
            meta = {'test': 'tost', 'ping': 'pong'}
314
            self.update_account_meta(meta)
315

  
316
            new_meta = {'test': 'test33'}
317
            kwargs = dict((
318
                'HTTP_X_ACCOUNT_META_%s' % k, str(v)
319
            ) for k, v in new_meta.items())
320
            r = self.post('/v1/%s' % self.user, **kwargs)
321
            self.assertEqual(r.status_code, 202)
322

  
323
            account_meta = self.get_account_meta()
324
            (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
325
                k in new_meta.keys())
326
            (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
327
                k, v in new_meta.items())
328

  
329
            (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
330
                k in meta.keys())
331

  
332
    def test_delete_meta(self):
333
        with AssertMappingInvariant(self.get_account_groups):
334
            meta = {'test': 'tost', 'ping': 'pong'}
335
            self.update_account_meta(meta)
336

  
337
            kwargs = dict(
338
                ('HTTP_X_ACCOUNT_META_%s' % k, '') for k, v in meta.items())
339
            r = self.post('/v1/%s?update=' % self.user, **kwargs)
340
            self.assertEqual(r.status_code, 202)
341

  
342
            account_meta = self.get_account_meta()
343

  
344
            (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
345
                k in meta.keys())
346

  
347
    def test_set_account_groups(self):
348
        with AssertMappingInvariant(self.get_account_meta):
349
            pithosdevs = ['verigak', 'gtsouk', 'chazapis']
350
            r = self.post(
351
                '/v1/%s?update=' % self.user,
352
                HTTP_X_ACCOUNT_GROUP_PITHOSDEV=','.join(pithosdevs))
353
            self.assertEqual(r.status_code, 202)
354

  
355
            account_groups = self.get_account_groups()
356
            self.assertTrue(
357
                'X-Account-Group-Pithosdev' in self.get_account_groups())
358
            self.assertEqual(
359
                account_groups['X-Account-Group-Pithosdev'],
360
                ','.join(sorted(pithosdevs)))
361

  
362
            clientdevs = ['pkanavos', 'mvasilak']
363
            r = self.post(
364
                '/v1/%s?update=' % self.user,
365
                HTTP_X_ACCOUNT_GROUP_CLIENTSDEV=','.join(clientdevs))
366
            self.assertEqual(r.status_code, 202)
367

  
368
            account_groups = self.get_account_groups()
369
            self.assertTrue(
370
                'X-Account-Group-Pithosdev' in account_groups)
371
            self.assertTrue(
372
                'X-Account-Group-Clientsdev' in account_groups)
373
            self.assertEqual(
374
                account_groups['X-Account-Group-Pithosdev'],
375
                ','.join(sorted(pithosdevs)))
376
            self.assertEqual(
377
                account_groups['X-Account-Group-Clientsdev'],
378
                ','.join(sorted(clientdevs)))
379

  
380
            clientdevs = ['mvasilak']
381
            r = self.post(
382
                '/v1/%s?update=' % self.user,
383
                HTTP_X_ACCOUNT_GROUP_CLIENTSDEV=''.join(clientdevs))
384
            self.assertEqual(r.status_code, 202)
385

  
386
            account_groups = self.get_account_groups()
387
            self.assertTrue(
388
                'X-Account-Group-Pithosdev' in account_groups)
389
            self.assertTrue(
390
                'X-Account-Group-Clientsdev' in account_groups)
391
            self.assertEqual(
392
                account_groups['X-Account-Group-Pithosdev'],
393
                ','.join(sorted(pithosdevs)))
394
            self.assertEqual(
395
                account_groups['X-Account-Group-Clientsdev'],
396
                ','.join(sorted(clientdevs)))
397

  
398
    def test_reset_account_groups(self):
399
        with AssertMappingInvariant(self.get_account_meta):
400
            groups = {'pithosdev': ['verigak', 'gtsouk', 'chazapis'],
401
                      'clientsdev': ['pkanavos', 'mvasilak']}
402
            headers = dict(('HTTP_X_ACCOUNT_GROUP_%s' % k, ','.join(v))
403
                           for k, v in groups.iteritems())
404
            r = self.post('/v1/%s?update=' % self.user, **headers)
405
            self.assertEqual(r.status_code, 202)
406

  
407
            groups = {'pithosdev': ['verigak',
408
                                    'gtsouk',
409
                                    'chazapis',
410
                                    'papagian']}
411
            headers = dict(('HTTP_X_ACCOUNT_GROUP_%s' % k, ','.join(v))
412
                           for k, v in groups.iteritems())
413
            account_meta = self.get_account_meta()
414
            headers.update(dict(('HTTP_%s' % k.upper().replace('-', '_'), v)
415
                           for k, v in account_meta.iteritems()))
416
            r = self.post('/v1/%s' % self.user, **headers)
417
            self.assertEqual(r.status_code, 202)
418

  
419
            account_groups = self.get_account_groups()
420
            self.assertTrue(
421
                'X-Account-Group-Pithosdev' in account_groups)
422
            self.assertTrue(
423
                'X-Account-Group-Clientsdev' not in account_groups)
424
            self.assertEqual(
425
                account_groups['X-Account-Group-Pithosdev'],
426
                ','.join(sorted(groups['pithosdev'])))
427

  
428
    def test_delete_account_groups(self):
429
        with AssertMappingInvariant(self.get_account_meta):
430
            groups = {'pithosdev': ['verigak', 'gtsouk', 'chazapis'],
431
                      'clientsdev': ['pkanavos', 'mvasilak']}
432
            headers = dict(('HTTP_X_ACCOUNT_GROUP_%s' % k, ','.join(v))
433
                           for k, v in groups.iteritems())
434
            self.post('/v1/%s?update=' % self.user, **headers)
435

  
436
            kwargs = dict(('HTTP_X_ACCOUNT_GROUP_%s' % k, '')
437
                          for k, v in groups.items())
438
            r = self.post('/v1/%s?update=' % self.user, **kwargs)
439
            self.assertEqual(r.status_code, 202)
440

  
441
            account_groups = self.get_account_groups()
442
            self.assertTrue(
443
                'X-Account-Group-Pithosdev' not in account_groups)
444
            self.assertTrue(
445
                'X-Account-Group-Clientsdev' not in account_groups)
b/snf-pithos-app/pithos/api/test/containers.py
1
#!/usr/bin/env python
2
#coding=utf8
3

  
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

  
37
from pithos.api.test import PithosAPITest, DATE_FORMATS, o_names,\
38
    strnextling, pithos_settings, pithos_test_settings
39
from pithos.backends.random_word import get_random_word
40

  
41
import django.utils.simplejson as json
42
from django.http import urlencode
43

  
44
from xml.dom import minidom
45
from urllib import quote
46

  
47
import random
48
import datetime
49

  
50

  
51
class ContainerHead(PithosAPITest):
52
    def test_get_meta(self):
53
        self.create_container('apples')
54

  
55
        # populate with objects
56
        objects = {}
57
        for i in range(random.randint(1, 100)):
58

  
59
            # upload object
60
            meta = {'foo%s' % i: 'bar'}
61
            name, data, resp = self.upload_object('apples', **meta)
62
            objects[name] = data
63

  
64
        t1 = datetime.datetime.utcnow()
65
        r = self.head('/v1/%s/apples' % self.user)
66
        self.assertEqual(int(r['X-Container-Object-Count']), len(objects))
67
        self.assertEqual(int(r['X-Container-Bytes-Used']),
68
                         sum([len(i) for i in objects.values()]))
69
        self.assertTrue('X-Container-Block-Size' in r)
70
        self.assertTrue('X-Container-Block-Hash' in r)
71
        self.assertTrue('X-Container-Until-Timestamp' not in r)
72
        self.assertEqual(r['X-Container-Policy-Versioning'], 'auto')
73
        self.assertEqual(int(r['X-Container-Policy-Quota']), 0)
74
        t2 = datetime.datetime.strptime(r['Last-Modified'], DATE_FORMATS[2])
75
        delta = (t2 - t1)
76
        threashold = datetime.timedelta(seconds=1)
77
        self.assertTrue(delta < threashold)
78
        self.assertTrue(r['X-Container-Object-Meta'])
79
        (self.assertTrue('foo%s' % i in r['X-Container-Object-Meta'])
80
            for i in range(len(objects)))
81

  
82

  
83
class ContainerGet(PithosAPITest):
84
    def setUp(self):
85
        PithosAPITest.setUp(self)
86

  
87
        self.cnames = ['pears', 'apples']
88
        self.objects = {}
89
        for c in self.cnames:
90
            self.create_container(c)
91

  
92
        self.objects['pears'] = {}
93
        for o in o_names[:8]:
94
            name, data, resp = self.upload_object('pears', o)
95
            self.objects['pears'][name] = data
96
        self.objects['apples'] = {}
97
        for o in o_names[8:]:
98
            name, data, resp = self.upload_object('apples', o)
99
            self.objects['apples'][name] = data
100

  
101
    def test_list_shared(self):
102
        # share an object
103
        cname = self.cnames[0]
104
        onames = self.objects[cname].keys()
105
        oname = onames.pop()
106
        r = self.post('/v1/%s/%s/%s' % (self.user, cname, oname),
107
                      content_type='',
108
                      HTTP_X_OBJECT_SHARING='read=*')
109
        self.assertEqual(r.status_code, 202)
110

  
111
        # publish another object
112
        other = onames.pop()
113
        r = self.post('/v1/%s/%s/%s' % (self.user, cname, other),
114
                      content_type='',
115
                      HTTP_X_OBJECT_PUBLIC='true')
116
        self.assertEqual(r.status_code, 202)
117

  
118
        # list shared and assert only the shared object is returned
119
        r = self.get('/v1/%s/%s?shared=' % (self.user, cname))
120
        self.assertEqual(r.status_code, 200)
121
        objects = r.content.split('\n')
122
        objects.remove('')
123
        self.assertEqual([oname], objects)
124

  
125
        # list detailed shared and assert only the shared object is returned
126
        r = self.get('/v1/%s/%s?shared=&format=json' % (self.user, cname))
127
        self.assertEqual(r.status_code, 200)
128
        try:
129
            objects = json.loads(r.content)
130
        except:
131
            self.fail('json format expected')
132
        self.assertEqual([oname], [o['name'] for o in objects])
133
        self.assertTrue('x_object_sharing' in objects[0])
134
        self.assertTrue('x_object_public' not in objects[0])
135

  
136
        # publish the shared object and assert it is still listed in the
137
        # shared objects
138
        r = self.post('/v1/%s/%s/%s' % (self.user, cname, oname),
139
                      content_type='',
140
                      HTTP_X_OBJECT_PUBLIC='true')
141
        self.assertEqual(r.status_code, 202)
142
        r = self.get('/v1/%s/%s?shared=&format=json' % (self.user, cname))
143
        self.assertEqual(r.status_code, 200)
144
        try:
145
            objects = json.loads(r.content)
146
        except:
147
            self.fail('json format expected')
148
        self.assertEqual([oname], [o['name'] for o in objects])
149
        self.assertTrue('x_object_sharing' in objects[0])
150
        # TODO
151
        #self.assertTrue('x_object_public' in objects[0])
152

  
153
        # create child object
154
        descendant = strnextling(oname)
155
        self.upload_object(cname, descendant)
156
        # request shared and assert child obejct is not listed
157
        r = self.get('/v1/%s/%s?shared=' % (self.user, cname))
158
        self.assertEqual(r.status_code, 200)
159
        objects = r.content.split('\n')
160
        objects.remove('')
161
        self.assertTrue(oname in objects)
162
        self.assertTrue(descendant not in objects)
163

  
164
        # check folder inheritance
165
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_SHARING='read=*')
166
        # create child object
167
        descendant = '%s/%s' % (oname, get_random_word(8))
168
        self.upload_object(cname, descendant)
169
        # request shared
170
        r = self.get('/v1/%s/%s?shared=' % (self.user, cname))
171
        self.assertEqual(r.status_code, 200)
172
        objects = r.content.split('\n')
173
        objects.remove('')
174
        self.assertTrue(oname in objects)
175
        self.assertTrue(descendant in objects)
176

  
177
    def test_list_public(self):
178
        # publish an object
179
        cname = self.cnames[0]
180
        onames = self.objects[cname].keys()
181
        oname = onames.pop()
182
        r = self.post('/v1/%s/%s/%s' % (self.user, cname, oname),
183
                      content_type='',
184
                      HTTP_X_OBJECT_PUBLIC='true')
185
        self.assertEqual(r.status_code, 202)
186

  
187
        # share another
188
        other = onames.pop()
189
        r = self.post('/v1/%s/%s/%s' % (self.user, cname, other),
190
                      content_type='',
191
                      HTTP_X_OBJECT_SHARING='read=alice')
192
        self.assertEqual(r.status_code, 202)
193

  
194
        # list public and assert only the public object is returned
195
        r = self.get('/v1/%s/%s?public=' % (self.user, cname))
196
        objects = r.content.split('\n')
197
        self.assertEqual(r.status_code, 200)
198
        self.assertTrue(oname in r.content.split('\n'))
199
        (self.assertTrue(o not in objects) for o in o_names[1:])
200

  
201
        # list detailed public and assert only the public object is returned
202
        r = self.get('/v1/%s/%s?public=&format=json' % (self.user, cname))
203
        self.assertEqual(r.status_code, 200)
204
        try:
205
            objects = json.loads(r.content)
206
        except:
207
            self.fail('json format expected')
208
        self.assertEqual([oname], [o['name'] for o in objects])
209
        self.assertTrue('x_object_sharing' not in objects[0])
210
        self.assertTrue('x_object_public' in objects[0])
211

  
212
        # share the public object and assert it is still listed in the
213
        # public objects
214
        r = self.post('/v1/%s/%s/%s' % (self.user, cname, oname),
215
                      content_type='',
216
                      HTTP_X_OBJECT_SHARING='read=alice')
217
        self.assertEqual(r.status_code, 202)
218
        r = self.get('/v1/%s/%s?public=&format=json' % (self.user, cname))
219
        self.assertEqual(r.status_code, 200)
220
        try:
221
            objects = json.loads(r.content)
222
        except:
223
            self.fail('json format expected')
224
        self.assertEqual([oname], [o['name'] for o in objects])
225
        self.assertTrue('x_object_sharing' in objects[0])
226
        self.assertTrue('x_object_public' in objects[0])
227

  
228
        # Assert listing the container public contents is forbidden to not
229
        # shared users
230
        r = self.get('/v1/%s/%s?public=&format=json' % (
231
            self.user, cname), user='bob')
232
        self.assertEqual(r.status_code, 403)
233

  
234
        # Assert listing the container public contents to shared users
235
        r = self.get('/v1/%s/%s?public=&format=json' % (
236
            self.user, cname), user='alice')
237
        self.assertEqual(r.status_code, 200)
238
        try:
239
            objects = json.loads(r.content)
240
        except:
241
            self.fail('json format expected')
242
        # TODO
243
        #self.assertEqual([oname], [o['name'] for o in objects])
244
        self.assertTrue('x_object_sharing' in objects[0])
245
        # assert public is not returned though
246
        self.assertTrue('x_object_public' not in objects[0])
247

  
248
        # create child object
249
        descendant = strnextling(oname)
250
        self.upload_object(cname, descendant)
251
        # request public and assert child obejct is not listed
252
        r = self.get('/v1/%s/%s?public=' % (self.user, cname))
253
        objects = r.content.split('\n')
254
        objects.remove('')
255
        self.assertEqual(r.status_code, 200)
256
        self.assertTrue(oname in objects)
257
        (self.assertTrue(o not in objects) for o in o_names[1:])
258

  
259
        # test folder inheritance
260
        oname, _ = self.create_folder(cname, HTTP_X_OBJECT_PUBLIC='true')
261
        # create child object
262
        descendant = '%s/%s' % (oname, get_random_word(8))
263
        self.upload_object(cname, descendant)
264
        # request public
265
        r = self.get('/v1/%s/%s?public=' % (self.user, cname))
266
        self.assertEqual(r.status_code, 200)
267
        objects = r.content.split('\n')
268
        self.assertTrue(oname in objects)
269
        self.assertTrue(descendant not in objects)
270

  
271
#    def test_list_shared_public(self):
272
#        # publish an object
273
#        cname = self.cnames[0]
274
#        onames = self.objects[cname].keys()
275
#        oname = onames.pop()
276
#        r = self.post('/v1/%s/%s/%s' % (self.user, cname, oname),
277
#                      content_type='',
278
#                      HTTP_X_OBJECT_PUBLIC='true')
279
#        self.assertEqual(r.status_code, 202)
280
#
281
#        # share another
282
#        other = onames.pop()
283
#        r = self.post('/v1/%s/%s/%s' % (self.user, cname, other),
284
#                      content_type='',
285
#                      HTTP_X_OBJECT_SHARING='read=alice')
286
#        self.assertEqual(r.status_code, 202)
287
#
288
#        # list shared and public objects and assert object is listed
289
#        r = self.get('/v1/%s/%s?shared=&public=&format=json' % (
290
#            self.user, cname))
291
#        self.assertEqual(r.status_code, 200)
292
#        objects = json.loads(r.content)
293
#        self.assertEqual([o['name'] for o in objects], sorted([oname, other]))
294
#        for o in objects:
295
#            if o['name'] == oname:
296
#                self.assertTrue('x_object_public' in objects[0])
297
#            elif o['name'] == other:
298
#                self.assertTrue('x_object_sharing' in objects[1])
299
#
300
#        # assert not listing shared and public to a not shared user
301
#        r = self.get('/v1/%s/%s?shared=&public=&format=json' % (
302
#            self.user, cname), user='bob')
303
#        self.assertEqual(r.status_code, 403)
304
#
305
#        # assert listing shared and public to a shared user
306
#        r = self.get('/v1/%s/%s?shared=&public=&format=json' % (
307
#            self.user, cname), user='alice')
308
#        self.assertEqual(r.status_code, 200)
309
#        try:
310
#            objects = json.loads(r.content)
311
#        except:
312
#            self.fail('json format expected')
313
#        self.assertEqual([o['name'] for o in objects], sorted([oname, other]))
314
#
315
#        # create child object
316
#        descentant1 = strnextling(oname)
317
#        self.upload_object(cname, descendant1)
318
#        descentant2 = strnextling(other)
319
#        self.upload_object(cname, descendant2)
320
#        r = self.get('/v1/%s/%s?shared=&public=&format=json' % (
321
#            self.user, cname), user='alice')
322
#        self.assertEqual(r.status_code, 200)
323
#        try:
324
#            objects = json.loads(r.content)
325
#        except:
326
#            self.fail('json format expected')
327
#        self.assertEqual([o['name'] for o in objects], [oname])
328
#
329
#        # test inheritance
330
#        oname1, _ = self.create_folder(cname,
331
#                                       HTTP_X_OBJECT_SHARING='read=alice')
332
#        # create child object
333
#        descendant1 = '%s/%s' % (oname, get_random_word(8))
334
#        self.upload_object(cname, descendant1)
335
#
336
#        oname2, _ = self.create_folder(cname,
337
#                                       HTTP_X_OBJECT_PUBLIC='true')
338
#        # create child object
339
#        descendant2 = '%s/%s' % (oname, get_random_word(8))
340
#        self.upload_object(cname, descendant2)
341
#
342
#
343
#        o = self.upload_random_data(self.container[1], 'folder2/object')
344
#        objs = self.client.list_objects(self.container[1], shared=True, public=True)
345
#        self.assertEqual(objs, ['folder1', 'folder1/object', 'folder2'])
346
#        objs = cl.list_objects(
347
#            self.container[1], shared=True, public=True, account=get_user()
348
#        )
349
#        self.assertEqual(objs, ['folder1', 'folder1/object'])
350
#
351
    def test_list_objects(self):
352
        cname = self.cnames[0]
353
        r = self.get('/v1/%s/%s' % (self.user, cname))
354
        self.assertTrue(r.status_code, 200)
355
        objects = r.content.split('\n')
356
        if '' in objects:
357
            objects.remove('')
358
        self.assertEqual(objects, sorted(self.objects[cname].keys()))
359

  
360
    def test_list_objects_containing_slash(self):
361
        self.create_container('test')
362
        self.upload_object('test', '/objectname')
363

  
364
        r = self.get('/v1/%s/test' % self.user)
365
        objects = r.content.split('\n')
366
        if '' in objects:
367
            objects.remove('')
368
        self.assertEqual(objects, ['/objectname'])
369

  
370
        r = self.get('/v1/%s/test?format=json' % self.user)
371
        try:
372
            objects = json.loads(r.content)
373
        except:
374
            self.fail('json format expected')
375
        self.assertEqual([o['name'] for o in objects], ['/objectname'])
376

  
377
        r = self.get('/v1/%s/test?format=xml' % self.user)
378
        try:
379
            objects = minidom.parseString(r.content)
380
        except:
381
            self.fail('xml format expected')
382
        self.assertEqual(
383
            [n.firstChild.data for n in objects.getElementsByTagName('name')],
384
            ['/objectname'])
385

  
386
    def test_list_objects_with_limit_marker(self):
387
        cname = self.cnames[0]
388
        r = self.get('/v1/%s/%s?limit=qwert' % (self.user, cname))
389
        self.assertTrue(r.status_code != 500)
390

  
391
        r = self.get('/v1/%s/%s?limit=2' % (self.user, cname))
392
        self.assertEqual(r.status_code, 200)
393
        objects = r.content.split('\n')
394
        if '' in objects:
395
            objects.remove('')
396

  
397
        onames = sorted(self.objects[cname].keys())
398
        self.assertEqual(objects, onames[:2])
399

  
400
        markers = ['How To Win Friends And Influence People.pdf',
401
                   'moms_birthday.jpg']
402
        limit = 4
403
        for m in markers:
404
            r = self.get('/v1/%s/%s?limit=%s&marker=%s' % (
405
                self.user, cname, limit, m))
406
            objects = r.content.split('\n')
407
            if '' in objects:
408
                objects.remove('')
409
            start = onames.index(m) + 1
410
            end = start + limit
411
            end = end if len(onames) >= end else len(onames)
412
            self.assertEqual(objects, onames[start:end])
413

  
414
    @pithos_test_settings(API_LIST_LIMIT=10)
415
    def test_list_limit_exceeds(self):
416
        self.create_container('container')
417

  
418
        for _ in range(pithos_settings.API_LIST_LIMIT + 1):
419
            self.upload_object('container')
420

  
421
        r = self.get('/v1/%s/container?format=json' % self.user)
422
        try:
423
            objects = json.loads(r.content)
424
        except:
425
            self.fail('json format expected')
426
        self.assertEqual(pithos_settings.API_LIST_LIMIT,
427
                         len(objects))
428

  
429
    def test_list_pseudo_hierarchical_folders(self):
430
        r = self.get('/v1/%s/apples?prefix=photos&delimiter=/' % self.user)
431
        self.assertEqual(r.status_code, 200)
432
        objects = r.content.split('\n')
433
        if '' in objects:
434
            objects.remove('')
435
        self.assertEquals(
436
            ['photos/animals/', 'photos/me.jpg', 'photos/plants/'],
437
            objects)
438

  
439
        r = self.get(
440
            '/v1/%s/apples?prefix=photos/animals&delimiter=/' % self.user)
441
        objects = r.content.split('\n')
442
        if '' in objects:
443
            objects.remove('')
444
        self.assertEquals(
445
            ['photos/animals/cats/', 'photos/animals/dogs/'], objects)
446

  
447
        r = self.get('/v1/%s/apples?path=photos' % self.user)
448
        objects = r.content.split('\n')
449
        if '' in objects:
450
            objects.remove('')
451
        self.assertEquals(['photos/me.jpg'], objects)
452

  
453
    def test_extended_list_json(self):
454
        params = {'format': 'json', 'limit': 2, 'prefix': 'photos/animals',
455
                  'delimiter': '/'}
456
        r = self.get('/v1/%s/apples?%s' % (self.user, urlencode(params)))
457
        self.assertEqual(r.status_code, 200)
458
        try:
459
            objects = json.loads(r.content)
460
        except:
461
            self.fail('json format expected')
462
        self.assertEqual(objects[0]['subdir'], 'photos/animals/cats/')
463
        self.assertEqual(objects[1]['subdir'], 'photos/animals/dogs/')
464

  
465
    def test_extended_list_xml(self):
466
        params = {'format': 'xml', 'limit': 4, 'prefix': 'photos',
467
                  'delimiter': '/'}
468
        r = self.get('/v1/%s/apples?%s' % (self.user, urlencode(params)))
469
        self.assertEqual(r.status_code, 200)
470
        try:
471
            xml = minidom.parseString(r.content)
472
        except:
473
            self.fail('xml format expected')
474
        self.assert_extended(xml, 'xml', 'object', size=4)
475
        dirs = xml.getElementsByTagName('subdir')
476
        self.assertEqual(len(dirs), 2)
477
        self.assertEqual(dirs[0].attributes['name'].value, 'photos/animals/')
478
        self.assertEqual(dirs[1].attributes['name'].value, 'photos/plants/')
479

  
480
        objects = xml.getElementsByTagName('name')
481
        self.assertEqual(len(objects), 1)
482
        self.assertEqual(objects[0].childNodes[0].data, 'photos/me.jpg')
483

  
484
    def test_list_meta_double_matching(self):
485
        # update object meta
486
        cname = 'apples'
487
        oname = self.objects[cname].keys().pop()
488
        meta = {'quality': 'aaa', 'stock': 'true'}
489
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
490
                       for k, v in meta.iteritems())
491
        self.post('/v1/%s/%s/%s' % (self.user, cname, oname),
492
                  content_type='', **headers)
493

  
494
        # list objects that satisfy the criteria
495
        r = self.get('/v1/%s/%s?meta=Quality,Stock' % (self.user, cname))
496
        self.assertEqual(r.status_code, 200)
497
        objects = r.content.split('\n')
498
        if '' in objects:
499
            objects.remove('')
500
        self.assertEqual(objects, [oname])
501

  
502
    def tearDown(self):
503
        pass
504

  
505
    def test_list_using_meta(self):
506
        # update object meta
507
        cname = 'apples'
508
        oname1 = self.objects[cname].keys().pop()
509
        self.post('/v1/%s/%s/%s' % (self.user, cname, oname1),
510
                  content_type='', HTTP_X_OBJECT_META_QUALITY='aaa')
511
        oname2 = self.objects[cname].keys().pop()
512
        self.post('/v1/%s/%s/%s' % (self.user, cname, oname2),
513
                  content_type='', HTTP_X_OBJECT_META_QUALITY='ab')
514

  
515
        oname3 = self.objects[cname].keys().pop()
516
        self.post('/v1/%s/%s/%s' % (self.user, cname, oname3),
517
                  content_type='', HTTP_X_OBJECT_META_STOCK='100')
518
        oname4 = self.objects[cname].keys().pop()
519
        self.post('/v1/%s/%s/%s' % (self.user, cname, oname4),
520
                  content_type='', HTTP_X_OBJECT_META_STOCK='200')
521

  
522
        # test multiple existence criteria matches
523
        r = self.get('/v1/%s/%s?meta=Quality,Stock' % (self.user, cname))
524
        self.assertEqual(r.status_code, 200)
525
        objects = r.content.split('\n')
526
        if '' in objects:
527
            objects.remove('')
528
        self.assertTrue(objects, sorted([oname1, oname2, oname3, oname4]))
529

  
530
        # list objects that satisfy the existence criteria
531
        r = self.get('/v1/%s/%s?meta=Stock' % (self.user, cname))
532
        self.assertEqual(r.status_code, 200)
533
        objects = r.content.split('\n')
534
        if '' in objects:
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff