Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-app / pithos / api / test / __init__.py @ f3787696

History | View | Annotate | Download (14.8 kB)

1 f3787696 Sofia Papagiannaki
#!/usr/bin/env python
2 f3787696 Sofia Papagiannaki
#coding=utf8
3 f3787696 Sofia Papagiannaki
4 f3787696 Sofia Papagiannaki
# Copyright 2011-2013 GRNET S.A. All rights reserved.
5 f3787696 Sofia Papagiannaki
#
6 f3787696 Sofia Papagiannaki
# Redistribution and use in source and binary forms, with or
7 f3787696 Sofia Papagiannaki
# without modification, are permitted provided that the following
8 f3787696 Sofia Papagiannaki
# conditions are met:
9 f3787696 Sofia Papagiannaki
#
10 f3787696 Sofia Papagiannaki
#   1. Redistributions of source code must retain the above
11 f3787696 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
12 f3787696 Sofia Papagiannaki
#      disclaimer.
13 f3787696 Sofia Papagiannaki
#
14 f3787696 Sofia Papagiannaki
#   2. Redistributions in binary form must reproduce the above
15 f3787696 Sofia Papagiannaki
#      copyright notice, this list of conditions and the following
16 f3787696 Sofia Papagiannaki
#      disclaimer in the documentation and/or other materials
17 f3787696 Sofia Papagiannaki
#      provided with the distribution.
18 f3787696 Sofia Papagiannaki
#
19 f3787696 Sofia Papagiannaki
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20 f3787696 Sofia Papagiannaki
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 f3787696 Sofia Papagiannaki
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 f3787696 Sofia Papagiannaki
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23 f3787696 Sofia Papagiannaki
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 f3787696 Sofia Papagiannaki
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 f3787696 Sofia Papagiannaki
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 f3787696 Sofia Papagiannaki
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27 f3787696 Sofia Papagiannaki
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28 f3787696 Sofia Papagiannaki
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29 f3787696 Sofia Papagiannaki
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 f3787696 Sofia Papagiannaki
# POSSIBILITY OF SUCH DAMAGE.
31 f3787696 Sofia Papagiannaki
#
32 f3787696 Sofia Papagiannaki
# The views and conclusions contained in the software and
33 f3787696 Sofia Papagiannaki
# documentation are those of the authors and should not be
34 f3787696 Sofia Papagiannaki
# interpreted as representing official policies, either expressed
35 f3787696 Sofia Papagiannaki
# or implied, of GRNET S.A.
36 f3787696 Sofia Papagiannaki
37 f3787696 Sofia Papagiannaki
from urlparse import urlunsplit, urlsplit
38 f3787696 Sofia Papagiannaki
from xml.dom import minidom
39 f3787696 Sofia Papagiannaki
40 f3787696 Sofia Papagiannaki
from snf_django.utils.testing import with_settings, astakos_user
41 f3787696 Sofia Papagiannaki
42 f3787696 Sofia Papagiannaki
from pithos.backends.random_word import get_random_word
43 f3787696 Sofia Papagiannaki
from pithos.api import settings as pithos_settings
44 f3787696 Sofia Papagiannaki
45 f3787696 Sofia Papagiannaki
from django.test import TestCase
46 f3787696 Sofia Papagiannaki
from django.utils.http import urlencode
47 f3787696 Sofia Papagiannaki
from django.conf import settings
48 f3787696 Sofia Papagiannaki
49 f3787696 Sofia Papagiannaki
import django.utils.simplejson as json
50 f3787696 Sofia Papagiannaki
51 f3787696 Sofia Papagiannaki
import re
52 f3787696 Sofia Papagiannaki
import random
53 f3787696 Sofia Papagiannaki
import threading
54 f3787696 Sofia Papagiannaki
import functools
55 f3787696 Sofia Papagiannaki
56 f3787696 Sofia Papagiannaki
pithos_test_settings = functools.partial(with_settings, pithos_settings)
57 f3787696 Sofia Papagiannaki
58 f3787696 Sofia Papagiannaki
DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
59 f3787696 Sofia Papagiannaki
                "%A, %d-%b-%y %H:%M:%S GMT",
60 f3787696 Sofia Papagiannaki
                "%a, %d %b %Y %H:%M:%S GMT"]
61 f3787696 Sofia Papagiannaki
62 f3787696 Sofia Papagiannaki
o_names = ['kate.jpg',
63 f3787696 Sofia Papagiannaki
           'kate_beckinsale.jpg',
64 f3787696 Sofia Papagiannaki
           'How To Win Friends And Influence People.pdf',
65 f3787696 Sofia Papagiannaki
           'moms_birthday.jpg',
66 f3787696 Sofia Papagiannaki
           'poodle_strut.mov',
67 f3787696 Sofia Papagiannaki
           'Disturbed - Down With The Sickness.mp3',
68 f3787696 Sofia Papagiannaki
           'army_of_darkness.avi',
69 f3787696 Sofia Papagiannaki
           'the_mad.avi',
70 f3787696 Sofia Papagiannaki
           'photos/animals/dogs/poodle.jpg',
71 f3787696 Sofia Papagiannaki
           'photos/animals/dogs/terrier.jpg',
72 f3787696 Sofia Papagiannaki
           'photos/animals/cats/persian.jpg',
73 f3787696 Sofia Papagiannaki
           'photos/animals/cats/siamese.jpg',
74 f3787696 Sofia Papagiannaki
           'photos/plants/fern.jpg',
75 f3787696 Sofia Papagiannaki
           'photos/plants/rose.jpg',
76 f3787696 Sofia Papagiannaki
           'photos/me.jpg']
77 f3787696 Sofia Papagiannaki
78 f3787696 Sofia Papagiannaki
details = {'container': ('name', 'count', 'bytes', 'last_modified',
79 f3787696 Sofia Papagiannaki
                         'x_container_policy'),
80 f3787696 Sofia Papagiannaki
           'object': ('name', 'hash', 'bytes', 'content_type',
81 f3787696 Sofia Papagiannaki
                      'content_encoding', 'last_modified',)}
82 f3787696 Sofia Papagiannaki
83 f3787696 Sofia Papagiannaki
return_codes = (400, 401, 403, 404, 503)
84 f3787696 Sofia Papagiannaki
85 f3787696 Sofia Papagiannaki
86 f3787696 Sofia Papagiannaki
class PithosAPITest(TestCase):
87 f3787696 Sofia Papagiannaki
    #TODO unauthorized request
88 f3787696 Sofia Papagiannaki
    def setUp(self):
89 f3787696 Sofia Papagiannaki
        pithos_settings.BACKEND_DB_MODULE = 'pithos.backends.lib.sqlalchemy'
90 f3787696 Sofia Papagiannaki
        pithos_settings.BACKEND_DB_CONNECTION = construct_db_connection()
91 f3787696 Sofia Papagiannaki
        pithos_settings.BACKEND_POOL_SIZE = 1
92 f3787696 Sofia Papagiannaki
        self.user = 'user'
93 f3787696 Sofia Papagiannaki
94 f3787696 Sofia Papagiannaki
    def tearDown(self):
95 f3787696 Sofia Papagiannaki
        #delete additionally created metadata
96 f3787696 Sofia Papagiannaki
        meta = self.get_account_meta()
97 f3787696 Sofia Papagiannaki
        self.delete_account_meta(meta)
98 f3787696 Sofia Papagiannaki
99 f3787696 Sofia Papagiannaki
        #delete additionally created groups
100 f3787696 Sofia Papagiannaki
        groups = self.get_account_groups()
101 f3787696 Sofia Papagiannaki
        self.delete_account_groups(groups)
102 f3787696 Sofia Papagiannaki
103 f3787696 Sofia Papagiannaki
        self._clean_account()
104 f3787696 Sofia Papagiannaki
105 f3787696 Sofia Papagiannaki
    def head(self, url, user='user', *args, **kwargs):
106 f3787696 Sofia Papagiannaki
        with astakos_user(user):
107 f3787696 Sofia Papagiannaki
            response = self.client.head(url, *args, **kwargs)
108 f3787696 Sofia Papagiannaki
        return response
109 f3787696 Sofia Papagiannaki
110 f3787696 Sofia Papagiannaki
    def get(self, url, user='user', *args, **kwargs):
111 f3787696 Sofia Papagiannaki
        with astakos_user(user):
112 f3787696 Sofia Papagiannaki
            response = self.client.get(url, *args, **kwargs)
113 f3787696 Sofia Papagiannaki
        return response
114 f3787696 Sofia Papagiannaki
115 f3787696 Sofia Papagiannaki
    def delete(self, url, user='user', *args, **kwargs):
116 f3787696 Sofia Papagiannaki
        with astakos_user(user):
117 f3787696 Sofia Papagiannaki
            response = self.client.delete(url, *args, **kwargs)
118 f3787696 Sofia Papagiannaki
        return response
119 f3787696 Sofia Papagiannaki
120 f3787696 Sofia Papagiannaki
    def post(self, url, user='user', *args, **kwargs):
121 f3787696 Sofia Papagiannaki
        with astakos_user(user):
122 f3787696 Sofia Papagiannaki
            kwargs.setdefault('content_type', 'application/octet-stream')
123 f3787696 Sofia Papagiannaki
            response = self.client.post(url, *args, **kwargs)
124 f3787696 Sofia Papagiannaki
        return response
125 f3787696 Sofia Papagiannaki
126 f3787696 Sofia Papagiannaki
    def put(self, url, user='user', *args, **kwargs):
127 f3787696 Sofia Papagiannaki
        with astakos_user(user):
128 f3787696 Sofia Papagiannaki
            kwargs.setdefault('content_type', 'application/octet-stream')
129 f3787696 Sofia Papagiannaki
            response = self.client.put(url, *args, **kwargs)
130 f3787696 Sofia Papagiannaki
        return response
131 f3787696 Sofia Papagiannaki
132 f3787696 Sofia Papagiannaki
    def _clean_account(self):
133 f3787696 Sofia Papagiannaki
        for c in self.list_containers():
134 f3787696 Sofia Papagiannaki
            self.delete_container_content(c['name'])
135 f3787696 Sofia Papagiannaki
            self.delete_container(c['name'])
136 f3787696 Sofia Papagiannaki
137 f3787696 Sofia Papagiannaki
    def update_account_meta(self, meta):
138 f3787696 Sofia Papagiannaki
        kwargs = dict(
139 f3787696 Sofia Papagiannaki
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
140 f3787696 Sofia Papagiannaki
        r = self.post('/v1/%s?update=' % self.user, **kwargs)
141 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
142 f3787696 Sofia Papagiannaki
        account_meta = self.get_account_meta()
143 f3787696 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
144 f3787696 Sofia Papagiannaki
            k in meta.keys())
145 f3787696 Sofia Papagiannaki
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
146 f3787696 Sofia Papagiannaki
            k, v in meta.items())
147 f3787696 Sofia Papagiannaki
148 f3787696 Sofia Papagiannaki
    def reset_account_meta(self, meta):
149 f3787696 Sofia Papagiannaki
        kwargs = dict(
150 f3787696 Sofia Papagiannaki
            ('HTTP_X_ACCOUNT_META_%s' % k, str(v)) for k, v in meta.items())
151 f3787696 Sofia Papagiannaki
        r = self.post('/v1/%s' % self.user, **kwargs)
152 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
153 f3787696 Sofia Papagiannaki
        account_meta = self.get_account_meta()
154 f3787696 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k in account_meta) for
155 f3787696 Sofia Papagiannaki
            k in meta.keys())
156 f3787696 Sofia Papagiannaki
        (self.assertEqual(account_meta['X-Account-Meta-%s' % k], v) for
157 f3787696 Sofia Papagiannaki
            k, v in meta.items())
158 f3787696 Sofia Papagiannaki
159 f3787696 Sofia Papagiannaki
    def delete_account_meta(self, meta):
160 f3787696 Sofia Papagiannaki
        transform = lambda k: 'HTTP_%s' % k.replace('-', '_').upper()
161 f3787696 Sofia Papagiannaki
        kwargs = dict((transform(k), '') for k, v in meta.items())
162 f3787696 Sofia Papagiannaki
        r = self.post('/v1/%s?update=' % self.user, **kwargs)
163 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
164 f3787696 Sofia Papagiannaki
        account_meta = self.get_account_meta()
165 f3787696 Sofia Papagiannaki
        (self.assertTrue('X-Account-Meta-%s' % k not in account_meta) for
166 f3787696 Sofia Papagiannaki
            k in meta.keys())
167 f3787696 Sofia Papagiannaki
        return r
168 f3787696 Sofia Papagiannaki
169 f3787696 Sofia Papagiannaki
    def delete_account_groups(self, groups):
170 f3787696 Sofia Papagiannaki
        r = self.post('/v1/%s?update=' % self.user, **groups)
171 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 202)
172 f3787696 Sofia Papagiannaki
        return r
173 f3787696 Sofia Papagiannaki
174 f3787696 Sofia Papagiannaki
    def get_account_info(self, until=None):
175 f3787696 Sofia Papagiannaki
        url = '/v1/%s' % self.user
176 f3787696 Sofia Papagiannaki
        if until is not None:
177 f3787696 Sofia Papagiannaki
            parts = list(urlsplit(url))
178 f3787696 Sofia Papagiannaki
            parts[3] = urlencode({
179 f3787696 Sofia Papagiannaki
                'until': until
180 f3787696 Sofia Papagiannaki
            })
181 f3787696 Sofia Papagiannaki
            url = urlunsplit(parts)
182 f3787696 Sofia Papagiannaki
        r = self.head(url)
183 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
184 f3787696 Sofia Papagiannaki
        return r
185 f3787696 Sofia Papagiannaki
186 f3787696 Sofia Papagiannaki
    def get_account_meta(self, until=None):
187 f3787696 Sofia Papagiannaki
        r = self.get_account_info(until=until)
188 f3787696 Sofia Papagiannaki
        headers = dict(r._headers.values())
189 f3787696 Sofia Papagiannaki
        map(headers.pop,
190 f3787696 Sofia Papagiannaki
            [k for k in headers.keys()
191 f3787696 Sofia Papagiannaki
                if not k.startswith('X-Account-Meta-')])
192 f3787696 Sofia Papagiannaki
        return headers
193 f3787696 Sofia Papagiannaki
194 f3787696 Sofia Papagiannaki
    def get_account_groups(self, until=None):
195 f3787696 Sofia Papagiannaki
        r = self.get_account_info(until=until)
196 f3787696 Sofia Papagiannaki
        headers = dict(r._headers.values())
197 f3787696 Sofia Papagiannaki
        map(headers.pop,
198 f3787696 Sofia Papagiannaki
            [k for k in headers.keys()
199 f3787696 Sofia Papagiannaki
                if not k.startswith('X-Account-Group-')])
200 f3787696 Sofia Papagiannaki
        return headers
201 f3787696 Sofia Papagiannaki
202 f3787696 Sofia Papagiannaki
    def list_containers(self, format='json', headers={}, **params):
203 f3787696 Sofia Papagiannaki
        url = '/v1/%s' % self.user
204 f3787696 Sofia Papagiannaki
        parts = list(urlsplit(url))
205 f3787696 Sofia Papagiannaki
        params['format'] = format
206 f3787696 Sofia Papagiannaki
        parts[3] = urlencode(params)
207 f3787696 Sofia Papagiannaki
        url = urlunsplit(parts)
208 f3787696 Sofia Papagiannaki
        _headers = dict(('HTTP_%s' % k.upper(), str(v))
209 f3787696 Sofia Papagiannaki
                        for k, v in headers.items())
210 f3787696 Sofia Papagiannaki
        r = self.get(url, **_headers)
211 f3787696 Sofia Papagiannaki
212 f3787696 Sofia Papagiannaki
        if format is None:
213 f3787696 Sofia Papagiannaki
            containers = r.content.split('\n')
214 f3787696 Sofia Papagiannaki
            if '' in containers:
215 f3787696 Sofia Papagiannaki
                containers.remove('')
216 f3787696 Sofia Papagiannaki
            return containers
217 f3787696 Sofia Papagiannaki
        elif format == 'json':
218 f3787696 Sofia Papagiannaki
            try:
219 f3787696 Sofia Papagiannaki
                containers = json.loads(r.content)
220 f3787696 Sofia Papagiannaki
            except:
221 f3787696 Sofia Papagiannaki
                self.fail('json format expected')
222 f3787696 Sofia Papagiannaki
            return containers
223 f3787696 Sofia Papagiannaki
        elif format == 'xml':
224 f3787696 Sofia Papagiannaki
            return minidom.parseString(r.content)
225 f3787696 Sofia Papagiannaki
226 f3787696 Sofia Papagiannaki
    def delete_container_content(self, cname):
227 f3787696 Sofia Papagiannaki
        r = self.delete('/v1/%s/%s?delimiter=/' % (self.user, cname))
228 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
229 f3787696 Sofia Papagiannaki
        return r
230 f3787696 Sofia Papagiannaki
231 f3787696 Sofia Papagiannaki
    def delete_container(self, cname):
232 f3787696 Sofia Papagiannaki
        r = self.delete('/v1/%s/%s' % (self.user, cname))
233 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 204)
234 f3787696 Sofia Papagiannaki
        return r
235 f3787696 Sofia Papagiannaki
236 f3787696 Sofia Papagiannaki
    def create_container(self, cname):
237 f3787696 Sofia Papagiannaki
        r = self.put('/v1/%s/%s' % (self.user, cname), data='')
238 f3787696 Sofia Papagiannaki
        self.assertTrue(r.status_code in (202, 201))
239 f3787696 Sofia Papagiannaki
        return r
240 f3787696 Sofia Papagiannaki
241 f3787696 Sofia Papagiannaki
    def upload_object(self, cname, oname=None, **meta):
242 f3787696 Sofia Papagiannaki
        oname = oname or get_random_word(8)
243 f3787696 Sofia Papagiannaki
        data = get_random_word(length=random.randint(1, 1024))
244 f3787696 Sofia Papagiannaki
        headers = dict(('HTTP_X_OBJECT_META_%s' % k.upper(), v)
245 f3787696 Sofia Papagiannaki
                       for k, v in meta.iteritems())
246 f3787696 Sofia Papagiannaki
        r = self.put('/v1/%s/%s/%s' % (
247 f3787696 Sofia Papagiannaki
            self.user, cname, oname), data=data, **headers)
248 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
249 f3787696 Sofia Papagiannaki
        return oname, data, r
250 f3787696 Sofia Papagiannaki
251 f3787696 Sofia Papagiannaki
    def create_folder(self, cname, oname=get_random_word(8), **headers):
252 f3787696 Sofia Papagiannaki
        r = self.put('/v1/%s/%s/%s' % (
253 f3787696 Sofia Papagiannaki
            self.user, cname, oname), data='',
254 f3787696 Sofia Papagiannaki
            content_type='application/directory',
255 f3787696 Sofia Papagiannaki
            **headers)
256 f3787696 Sofia Papagiannaki
        self.assertEqual(r.status_code, 201)
257 f3787696 Sofia Papagiannaki
        return oname, r
258 f3787696 Sofia Papagiannaki
259 f3787696 Sofia Papagiannaki
    def list_objects(self, cname):
260 f3787696 Sofia Papagiannaki
        r = self.get('/v1/%s/%s?format=json' % (self.user, cname))
261 f3787696 Sofia Papagiannaki
        self.assertTrue(r.status_code in (200, 204))
262 f3787696 Sofia Papagiannaki
        try:
263 f3787696 Sofia Papagiannaki
            objects = json.loads(r.content)
264 f3787696 Sofia Papagiannaki
        except:
265 f3787696 Sofia Papagiannaki
            self.fail('json format expected')
266 f3787696 Sofia Papagiannaki
        return objects
267 f3787696 Sofia Papagiannaki
268 f3787696 Sofia Papagiannaki
    def assert_status(self, status, codes):
269 f3787696 Sofia Papagiannaki
        l = [elem for elem in return_codes]
270 f3787696 Sofia Papagiannaki
        if isinstance(codes, list):
271 f3787696 Sofia Papagiannaki
            l.extend(codes)
272 f3787696 Sofia Papagiannaki
        else:
273 f3787696 Sofia Papagiannaki
            l.append(codes)
274 f3787696 Sofia Papagiannaki
        self.assertTrue(status in l)
275 f3787696 Sofia Papagiannaki
276 f3787696 Sofia Papagiannaki
    def assert_extended(self, data, format, type, size=10000):
277 f3787696 Sofia Papagiannaki
        if format == 'xml':
278 f3787696 Sofia Papagiannaki
            self._assert_xml(data, type, size)
279 f3787696 Sofia Papagiannaki
        elif format == 'json':
280 f3787696 Sofia Papagiannaki
            self._assert_json(data, type, size)
281 f3787696 Sofia Papagiannaki
282 f3787696 Sofia Papagiannaki
    def _assert_json(self, data, type, size):
283 f3787696 Sofia Papagiannaki
        convert = lambda s: s.lower()
284 f3787696 Sofia Papagiannaki
        info = [convert(elem) for elem in details[type]]
285 f3787696 Sofia Papagiannaki
        self.assertTrue(len(data) <= size)
286 f3787696 Sofia Papagiannaki
        for item in info:
287 f3787696 Sofia Papagiannaki
            for i in data:
288 f3787696 Sofia Papagiannaki
                if 'subdir' in i.keys():
289 f3787696 Sofia Papagiannaki
                    continue
290 f3787696 Sofia Papagiannaki
                self.assertTrue(item in i.keys())
291 f3787696 Sofia Papagiannaki
292 f3787696 Sofia Papagiannaki
    def _assert_xml(self, data, type, size):
293 f3787696 Sofia Papagiannaki
        convert = lambda s: s.lower()
294 f3787696 Sofia Papagiannaki
        info = [convert(elem) for elem in details[type]]
295 f3787696 Sofia Papagiannaki
        try:
296 f3787696 Sofia Papagiannaki
            info.remove('content_encoding')
297 f3787696 Sofia Papagiannaki
        except ValueError:
298 f3787696 Sofia Papagiannaki
            pass
299 f3787696 Sofia Papagiannaki
        xml = data
300 f3787696 Sofia Papagiannaki
        entities = xml.getElementsByTagName(type)
301 f3787696 Sofia Papagiannaki
        self.assertTrue(len(entities) <= size)
302 f3787696 Sofia Papagiannaki
        for e in entities:
303 f3787696 Sofia Papagiannaki
            for item in info:
304 f3787696 Sofia Papagiannaki
                self.assertTrue(e.getElementsByTagName(item))
305 f3787696 Sofia Papagiannaki
306 f3787696 Sofia Papagiannaki
307 f3787696 Sofia Papagiannaki
class AssertMappingInvariant(object):
308 f3787696 Sofia Papagiannaki
    def __init__(self, callable, *args, **kwargs):
309 f3787696 Sofia Papagiannaki
        self.callable = callable
310 f3787696 Sofia Papagiannaki
        self.args = args
311 f3787696 Sofia Papagiannaki
        self.kwargs = kwargs
312 f3787696 Sofia Papagiannaki
313 f3787696 Sofia Papagiannaki
    def __enter__(self):
314 f3787696 Sofia Papagiannaki
        self.map = self.callable(*self.args, **self.kwargs)
315 f3787696 Sofia Papagiannaki
        return self.map
316 f3787696 Sofia Papagiannaki
317 f3787696 Sofia Papagiannaki
    def __exit__(self, type, value, tb):
318 f3787696 Sofia Papagiannaki
        map = self.callable(*self.args, **self.kwargs)
319 f3787696 Sofia Papagiannaki
        for k, v in self.map.items():
320 f3787696 Sofia Papagiannaki
            if is_date(v):
321 f3787696 Sofia Papagiannaki
                continue
322 f3787696 Sofia Papagiannaki
323 f3787696 Sofia Papagiannaki
            assert(k in map), '%s not in map' % k
324 f3787696 Sofia Papagiannaki
            assert v == map[k]
325 f3787696 Sofia Papagiannaki
326 f3787696 Sofia Papagiannaki
django_sqlalchemy_engines = {
327 f3787696 Sofia Papagiannaki
    'django.db.backends.postgresql_psycopg2': 'postgresql+psycopg2',
328 f3787696 Sofia Papagiannaki
    'django.db.backends.postgresql': 'postgresql',
329 f3787696 Sofia Papagiannaki
    'django.db.backends.mysql': '',
330 f3787696 Sofia Papagiannaki
    'django.db.backends.sqlite3': 'mssql',
331 f3787696 Sofia Papagiannaki
    'django.db.backends.oracle': 'oracle'}
332 f3787696 Sofia Papagiannaki
333 f3787696 Sofia Papagiannaki
334 f3787696 Sofia Papagiannaki
def construct_db_connection():
335 f3787696 Sofia Papagiannaki
    """Convert the django default database to an sqlalchemy connection
336 f3787696 Sofia Papagiannaki
       string"""
337 f3787696 Sofia Papagiannaki
    db = settings.DATABASES['default']
338 f3787696 Sofia Papagiannaki
    if db['ENGINE'] == 'django.db.backends.sqlite3':
339 f3787696 Sofia Papagiannaki
        return 'sqlite://'
340 f3787696 Sofia Papagiannaki
    else:
341 f3787696 Sofia Papagiannaki
        d = dict(scheme=django_sqlalchemy_engines.get(db['ENGINE']),
342 f3787696 Sofia Papagiannaki
                 user=db['USER'],
343 f3787696 Sofia Papagiannaki
                 pwd=db['PASSWORD'],
344 f3787696 Sofia Papagiannaki
                 host=db['HOST'].lower(),
345 f3787696 Sofia Papagiannaki
                 port=int(db['PORT']) if db['PORT'] != '' else '',
346 f3787696 Sofia Papagiannaki
                 name=db['NAME'])
347 f3787696 Sofia Papagiannaki
        return '%(scheme)s://%(user)s:%(pwd)s@%(host)s:%(port)s/%(name)s' % d
348 f3787696 Sofia Papagiannaki
349 f3787696 Sofia Papagiannaki
350 f3787696 Sofia Papagiannaki
def is_date(date):
351 f3787696 Sofia Papagiannaki
    __D = r'(?P<day>\d{2})'
352 f3787696 Sofia Papagiannaki
    __D2 = r'(?P<day>[ \d]\d)'
353 f3787696 Sofia Papagiannaki
    __M = r'(?P<mon>\w{3})'
354 f3787696 Sofia Papagiannaki
    __Y = r'(?P<year>\d{4})'
355 f3787696 Sofia Papagiannaki
    __Y2 = r'(?P<year>\d{2})'
356 f3787696 Sofia Papagiannaki
    __T = r'(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})'
357 f3787696 Sofia Papagiannaki
    RFC1123_DATE = re.compile(r'^\w{3}, %s %s %s %s GMT$' % (
358 f3787696 Sofia Papagiannaki
        __D, __M, __Y, __T))
359 f3787696 Sofia Papagiannaki
    RFC850_DATE = re.compile(r'^\w{6,9}, %s-%s-%s %s GMT$' % (
360 f3787696 Sofia Papagiannaki
        __D, __M, __Y2, __T))
361 f3787696 Sofia Papagiannaki
    ASCTIME_DATE = re.compile(r'^\w{3} %s %s %s %s$' % (
362 f3787696 Sofia Papagiannaki
        __M, __D2, __T, __Y))
363 f3787696 Sofia Papagiannaki
    for regex in RFC1123_DATE, RFC850_DATE, ASCTIME_DATE:
364 f3787696 Sofia Papagiannaki
        m = regex.match(date)
365 f3787696 Sofia Papagiannaki
        if m is not None:
366 f3787696 Sofia Papagiannaki
            return True
367 f3787696 Sofia Papagiannaki
    return False
368 f3787696 Sofia Papagiannaki
369 f3787696 Sofia Papagiannaki
370 f3787696 Sofia Papagiannaki
def strnextling(prefix):
371 f3787696 Sofia Papagiannaki
    """Return the first unicode string
372 f3787696 Sofia Papagiannaki
       greater than but not starting with given prefix.
373 f3787696 Sofia Papagiannaki
       strnextling('hello') -> 'hellp'
374 f3787696 Sofia Papagiannaki
    """
375 f3787696 Sofia Papagiannaki
    if not prefix:
376 f3787696 Sofia Papagiannaki
        ## all strings start with the null string,
377 f3787696 Sofia Papagiannaki
        ## therefore we have to approximate strnextling('')
378 f3787696 Sofia Papagiannaki
        ## with the last unicode character supported by python
379 f3787696 Sofia Papagiannaki
        ## 0x10ffff for wide (32-bit unicode) python builds
380 f3787696 Sofia Papagiannaki
        ## 0x00ffff for narrow (16-bit unicode) python builds
381 f3787696 Sofia Papagiannaki
        ## We will not autodetect. 0xffff is safe enough.
382 f3787696 Sofia Papagiannaki
        return unichr(0xffff)
383 f3787696 Sofia Papagiannaki
    s = prefix[:-1]
384 f3787696 Sofia Papagiannaki
    c = ord(prefix[-1])
385 f3787696 Sofia Papagiannaki
    if c >= 0xffff:
386 f3787696 Sofia Papagiannaki
        raise RuntimeError
387 f3787696 Sofia Papagiannaki
    s += unichr(c + 1)
388 f3787696 Sofia Papagiannaki
    return s
389 f3787696 Sofia Papagiannaki
390 f3787696 Sofia Papagiannaki
391 f3787696 Sofia Papagiannaki
def test_concurrently(times=2):
392 f3787696 Sofia Papagiannaki
    """
393 f3787696 Sofia Papagiannaki
    Add this decorator to small pieces of code that you want to test
394 f3787696 Sofia Papagiannaki
    concurrently to make sure they don't raise exceptions when run at the
395 f3787696 Sofia Papagiannaki
    same time.  E.g., some Django views that do a SELECT and then a subsequent
396 f3787696 Sofia Papagiannaki
    INSERT might fail when the INSERT assumes that the data has not changed
397 f3787696 Sofia Papagiannaki
    since the SELECT.
398 f3787696 Sofia Papagiannaki
    """
399 f3787696 Sofia Papagiannaki
    def test_concurrently_decorator(test_func):
400 f3787696 Sofia Papagiannaki
        def wrapper(*args, **kwargs):
401 f3787696 Sofia Papagiannaki
            exceptions = []
402 f3787696 Sofia Papagiannaki
403 f3787696 Sofia Papagiannaki
            def call_test_func():
404 f3787696 Sofia Papagiannaki
                try:
405 f3787696 Sofia Papagiannaki
                    test_func(*args, **kwargs)
406 f3787696 Sofia Papagiannaki
                except Exception, e:
407 f3787696 Sofia Papagiannaki
                    exceptions.append(e)
408 f3787696 Sofia Papagiannaki
                    raise
409 f3787696 Sofia Papagiannaki
410 f3787696 Sofia Papagiannaki
            threads = []
411 f3787696 Sofia Papagiannaki
            for i in range(times):
412 f3787696 Sofia Papagiannaki
                threads.append(threading.Thread())
413 f3787696 Sofia Papagiannaki
            for t in threads:
414 f3787696 Sofia Papagiannaki
                t.start()
415 f3787696 Sofia Papagiannaki
            for t in threads:
416 f3787696 Sofia Papagiannaki
                t.join()
417 f3787696 Sofia Papagiannaki
            if exceptions:
418 f3787696 Sofia Papagiannaki
                raise Exception(
419 f3787696 Sofia Papagiannaki
                    ('test_concurrently intercepted %s',
420 f3787696 Sofia Papagiannaki
                     'exceptions: %s') % (len(exceptions), exceptions))
421 f3787696 Sofia Papagiannaki
        return wrapper
422 f3787696 Sofia Papagiannaki
    return test_concurrently_decorator