Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ 3cdb2b79

History | View | Annotate | Download (14.8 kB)

1
#!/usr/bin/env python
2
#coding=utf8
3

    
4
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36

    
37
from urlparse import urlunsplit, urlsplit
38
from xml.dom import minidom
39

    
40
from snf_django.utils.testing import with_settings, astakos_user
41

    
42
from pithos.backends.random_word import get_random_word
43
from pithos.api import settings as pithos_settings
44

    
45
from django.test import TestCase
46
from django.utils.http import urlencode
47
from django.conf import settings
48

    
49
import django.utils.simplejson as json
50

    
51
import re
52
import random
53
import threading
54
import functools
55

    
56
pithos_test_settings = functools.partial(with_settings, pithos_settings)
57

    
58
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
59
                "%A, %d-%b-%y %H:%M:%S GMT",
60
                "%a, %d %b %Y %H:%M:%S GMT"]
61

    
62
o_names = ['kate.jpg',
63
           'kate_beckinsale.jpg',
64
           'How To Win Friends And Influence People.pdf',
65
           'moms_birthday.jpg',
66
           'poodle_strut.mov',
67
           'Disturbed - Down With The Sickness.mp3',
68
           'army_of_darkness.avi',
69
           'the_mad.avi',
70
           'photos/animals/dogs/poodle.jpg',
71
           'photos/animals/dogs/terrier.jpg',
72
           'photos/animals/cats/persian.jpg',
73
           'photos/animals/cats/siamese.jpg',
74
           'photos/plants/fern.jpg',
75
           'photos/plants/rose.jpg',
76
           'photos/me.jpg']
77

    
78
details = {'container': ('name', 'count', 'bytes', 'last_modified',
79
                         'x_container_policy'),
80
           'object': ('name', 'hash', 'bytes', 'content_type',
81
                      'content_encoding', 'last_modified',)}
82

    
83
return_codes = (400, 401, 403, 404, 503)
84

    
85

    
86
class PithosAPITest(TestCase):
87
    #TODO unauthorized request
88
    def setUp(self):
89
        pithos_settings.BACKEND_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
90
        pithos_settings.BACKEND_DB_CONNECTION = construct_db_connection()
91
        pithos_settings.BACKEND_POOL_SIZE = 1
92
        self.user = 'user'
93

    
94
    def tearDown(self):
95
        #delete additionally created metadata
96
        meta = self.get_account_meta()
97
        self.delete_account_meta(meta)
98

    
99
        #delete additionally created groups
100
        groups = self.get_account_groups()
101
        self.delete_account_groups(groups)
102

    
103
        self._clean_account()
104

    
105
    def head(self, url, user='user', *args, **kwargs):
106
        with astakos_user(user):
107
            response = self.client.head(url, *args, **kwargs)
108
        return response
109

    
110
    def get(self, url, user='user', *args, **kwargs):
111
        with astakos_user(user):
112
            response = self.client.get(url, *args, **kwargs)
113
        return response
114

    
115
    def delete(self, url, user='user', *args, **kwargs):
116
        with astakos_user(user):
117
            response = self.client.delete(url, *args, **kwargs)
118
        return response
119

    
120
    def post(self, url, user='user', *args, **kwargs):
121
        with astakos_user(user):
122
            kwargs.setdefault('content_type', 'application/octet-stream')
123
            response = self.client.post(url, *args, **kwargs)
124
        return response
125

    
126
    def put(self, url, user='user', *args, **kwargs):
127
        with astakos_user(user):
128
            kwargs.setdefault('content_type', 'application/octet-stream')
129
            response = self.client.put(url, *args, **kwargs)
130
        return response
131

    
132
    def _clean_account(self):
133
        for c in self.list_containers():
134
            self.delete_container_content(c['name'])
135
            self.delete_container(c['name'])
136

    
137
    def update_account_meta(self, meta):
138
        kwargs = dict(
139
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
140
        r = self.post('/v1/%s?update=' % self.user, **kwargs)
141
        self.assertEqual(r.status_code, 202)
142
        account_meta = self.get_account_meta()
143
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
144
            k in meta.keys())
145
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
146
            k, v in meta.items())
147

    
148
    def reset_account_meta(self, meta):
149
        kwargs = dict(
150
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
151
        r = self.post('/v1/%s' % self.user, **kwargs)
152
        self.assertEqual(r.status_code, 202)
153
        account_meta = self.get_account_meta()
154
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
155
            k in meta.keys())
156
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
157
            k, v in meta.items())
158

    
159
    def delete_account_meta(self, meta):
160
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
161
        kwargs = dict((transform(k), '') for k, v in meta.items())
162
        r = self.post('/v1/%s?update=' % self.user, **kwargs)
163
        self.assertEqual(r.status_code, 202)
164
        account_meta = self.get_account_meta()
165
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
166
            k in meta.keys())
167
        return r
168

    
169
    def delete_account_groups(self, groups):
170
        r = self.post('/v1/%s?update=' % self.user, **groups)
171
        self.assertEqual(r.status_code, 202)
172
        return r
173

    
174
    def get_account_info(self, until=None):
175
        url = '/v1/%s' % self.user
176
        if until is not None:
177
            parts = list(urlsplit(url))
178
            parts[3] = urlencode({
179
                'until': until
180
            })
181
            url = urlunsplit(parts)
182
        r = self.head(url)
183
        self.assertEqual(r.status_code, 204)
184
        return r
185

    
186
    def get_account_meta(self, until=None):
187
        r = self.get_account_info(until=until)
188
        headers = dict(r._headers.values())
189
        map(headers.pop,
190
            [k for k in headers.keys()
191
                if not k.startswith('X-Account-Meta-')])
192
        return headers
193

    
194
    def get_account_groups(self, until=None):
195
        r = self.get_account_info(until=until)
196
        headers = dict(r._headers.values())
197
        map(headers.pop,
198
            [k for k in headers.keys()
199
                if not k.startswith('X-Account-Group-')])
200
        return headers
201

    
202
    def list_containers(self, format='json', headers={}, **params):
203
        url = '/v1/%s' % self.user
204
        parts = list(urlsplit(url))
205
        params['format'] = format
206
        parts[3] = urlencode(params)
207
        url = urlunsplit(parts)
208
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
209
                        for k, v in headers.items())
210
        r = self.get(url, **_headers)
211

    
212
        if format is None:
213
            containers = r.content.split('\n')
214
            if '' in containers:
215
                containers.remove('')
216
            return containers
217
        elif format == 'json':
218
            try:
219
                containers = json.loads(r.content)
220
            except:
221
                self.fail('json format expected')
222
            return containers
223
        elif format == 'xml':
224
            return minidom.parseString(r.content)
225

    
226
    def delete_container_content(self, cname):
227
        r = self.delete('/v1/%s/%s?delimiter=/' % (self.user, cname))
228
        self.assertEqual(r.status_code, 204)
229
        return r
230

    
231
    def delete_container(self, cname):
232
        r = self.delete('/v1/%s/%s' % (self.user, cname))
233
        self.assertEqual(r.status_code, 204)
234
        return r
235

    
236
    def create_container(self, cname):
237
        r = self.put('/v1/%s/%s' % (self.user, cname), data='')
238
        self.assertTrue(r.status_code in (202, 201))
239
        return r
240

    
241
    def upload_object(self, cname, oname=None, **meta):
242
        oname = oname or get_random_word(8)
243
        data = get_random_word(length=random.randint(1, 1024))
244
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
245
                       for k, v in meta.iteritems())
246
        r = self.put('/v1/%s/%s/%s' % (
247
            self.user, cname, oname), data=data, **headers)
248
        self.assertEqual(r.status_code, 201)
249
        return oname, data, r
250

    
251
    def create_folder(self, cname, oname=get_random_word(8), **headers):
252
        r = self.put('/v1/%s/%s/%s' % (
253
            self.user, cname, oname), data='',
254
            content_type='application/directory',
255
            **headers)
256
        self.assertEqual(r.status_code, 201)
257
        return oname, r
258

    
259
    def list_objects(self, cname):
260
        r = self.get('/v1/%s/%s?format=json' % (self.user, cname))
261
        self.assertTrue(r.status_code in (200, 204))
262
        try:
263
            objects = json.loads(r.content)
264
        except:
265
            self.fail('json format expected')
266
        return objects
267

    
268
    def assert_status(self, status, codes):
269
        l = [elem for elem in return_codes]
270
        if isinstance(codes, list):
271
            l.extend(codes)
272
        else:
273
            l.append(codes)
274
        self.assertTrue(status in l)
275

    
276
    def assert_extended(self, data, format, type, size=10000):
277
        if format == 'xml':
278
            self._assert_xml(data, type, size)
279
        elif format == 'json':
280
            self._assert_json(data, type, size)
281

    
282
    def _assert_json(self, data, type, size):
283
        convert = lambda s: s.lower()
284
        info = [convert(elem) for elem in details[type]]
285
        self.assertTrue(len(data) <= size)
286
        for item in info:
287
            for i in data:
288
                if 'subdir' in i.keys():
289
                    continue
290
                self.assertTrue(item in i.keys())
291

    
292
    def _assert_xml(self, data, type, size):
293
        convert = lambda s: s.lower()
294
        info = [convert(elem) for elem in details[type]]
295
        try:
296
            info.remove('content_encoding')
297
        except ValueError:
298
            pass
299
        xml = data
300
        entities = xml.getElementsByTagName(type)
301
        self.assertTrue(len(entities) <= size)
302
        for e in entities:
303
            for item in info:
304
                self.assertTrue(e.getElementsByTagName(item))
305

    
306

    
307
class AssertMappingInvariant(object):
308
    def __init__(self, callable, *args, **kwargs):
309
        self.callable = callable
310
        self.args = args
311
        self.kwargs = kwargs
312

    
313
    def __enter__(self):
314
        self.map = self.callable(*self.args, **self.kwargs)
315
        return self.map
316

    
317
    def __exit__(self, type, value, tb):
318
        map = self.callable(*self.args, **self.kwargs)
319
        for k, v in self.map.items():
320
            if is_date(v):
321
                continue
322

    
323
            assert(k in map), '%s not in map' % k
324
            assert v == map[k]
325

    
326
django_sqlalchemy_engines = {
327
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
328
    'django.db.backends.postgresql': 'postgresql',
329
    'django.db.backends.mysql': '',
330
    'django.db.backends.sqlite3': 'mssql',
331
    'django.db.backends.oracle': 'oracle'}
332

    
333

    
334
def construct_db_connection():
335
    """Convert the django default database to an sqlalchemy connection
336
       string"""
337
    db = settings.DATABASES['default']
338
    if db['ENGINE'] == 'django.db.backends.sqlite3':
339
        return 'sqlite://'
340
    else:
341
        d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
342
                 user=db['USER'],
343
                 pwd=db['PASSWORD'],
344
                 host=db['HOST'].lower(),
345
                 port=int(db['PORT']) if db['PORT'] != '' else '',
346
                 name=db['NAME'])
347
        return '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d
348

    
349

    
350
def is_date(date):
351
    __D = r'(?P<day>\d{2})'
352
    __D2 = r'(?P<day>[ \d]\d)'
353
    __M = r'(?P<mon>\w{3})'
354
    __Y = r'(?P<year>\d{4})'
355
    __Y2 = r'(?P<year>\d{2})'
356
    __T = r'(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})'
357
    RFC1123_DATE = re.compile(r'^\w{3}, %s %s %s %s GMT$' % (
358
        __D, __M, __Y, __T))
359
    RFC850_DATE = re.compile(r'^\w{6,9}, %s-%s-%s %s GMT$' % (
360
        __D, __M, __Y2, __T))
361
    ASCTIME_DATE = re.compile(r'^\w{3} %s %s %s %s$' % (
362
        __M, __D2, __T, __Y))
363
    for regex in RFC1123_DATE, RFC850_DATE, ASCTIME_DATE:
364
        m = regex.match(date)
365
        if m is not None:
366
            return True
367
    return False
368

    
369

    
370
def strnextling(prefix):
371
    """Return the first unicode string
372
       greater than but not starting with given prefix.
373
       strnextling('hello') -> 'hellp'
374
    """
375
    if not prefix:
376
        ## all strings start with the null string,
377
        ## therefore we have to approximate strnextling('')
378
        ## with the last unicode character supported by python
379
        ## 0x10ffff for wide (32-bit unicode) python builds
380
        ## 0x00ffff for narrow (16-bit unicode) python builds
381
        ## We will not autodetect. 0xffff is safe enough.
382
        return unichr(0xffff)
383
    s = prefix[:-1]
384
    c = ord(prefix[-1])
385
    if c >= 0xffff:
386
        raise RuntimeError
387
    s += unichr(c + 1)
388
    return s
389

    
390

    
391
def test_concurrently(times=2):
392
    """
393
    Add this decorator to small pieces of code that you want to test
394
    concurrently to make sure they don't raise exceptions when run at the
395
    same time.  E.g., some Django views that do a SELECT and then a subsequent
396
    INSERT might fail when the INSERT assumes that the data has not changed
397
    since the SELECT.
398
    """
399
    def test_concurrently_decorator(test_func):
400
        def wrapper(*args, **kwargs):
401
            exceptions = []
402

    
403
            def call_test_func():
404
                try:
405
                    test_func(*args, **kwargs)
406
                except Exception, e:
407
                    exceptions.append(e)
408
                    raise
409

    
410
            threads = []
411
            for i in range(times):
412
                threads.append(threading.Thread())
413
            for t in threads:
414
                t.start()
415
            for t in threads:
416
                t.join()
417
            if exceptions:
418
                raise Exception(
419
                    ('test_concurrently intercepted %s',
420
                     'exceptions: %s') % (len(exceptions), exceptions))
421
        return wrapper
422
    return test_concurrently_decorator