Statistics
| Branch: | Tag: | Revision:

root / snf-django-lib / snf_django / utils / testing.py @ 2e90e666

History | View | Annotate | Download (11.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
from contextlib import contextmanager
36
from django.test import TestCase
37
from django.utils import simplejson as json
38
from django.utils.encoding import smart_unicode
39
from mock import patch
40

    
41

    
42
@contextmanager
43
def override_settings(settings, **kwargs):
44
    """
45
    Helper context manager to override django settings within the provided
46
    context.
47

48
    All keyword arguments provided are set to the django settings object and
49
    get reverted/removed when the manager exits.
50

51
    >>> from synnefo.util.testing import override_settings
52
    >>> from django.conf import settings
53
    >>> with override_settings(settings, DEBUG=True):
54
    ...     assert settings.DEBUG == True
55

56
    The special arguemnt ``prefix`` can be set to prefix all setting keys with
57
    the provided value.
58

59
    >>> from django.conf import settings
60
    >>> from django.core import mail
61
    >>> with override_settings(settings, CONTACT_EMAILS=['kpap@grnet.gr'],
62
    ...                        prefix='MYAPP_'):
63
    ...     from django.core.mail import send_mail
64
    ...     send_mail("hello", "I love you kpap", settings.DEFAULT_FROM_EMAIL,
65
    ...               settings.MYAPP_CONTACT_EMAILS)
66
    ...     assert 'kpap@grnet.gr' in mail.mailbox[0].recipients()
67

68
    If you plan to reuse it
69

70
    >>> import functools
71
    >>> from synnefo.util.testing import override_settings
72
    >>> from django.conf import settings
73
    >>> myapp_settings = functools.partial(override_settings, prefix='MYAPP_')
74
    >>> with myapp_settings(CONTACT_EMAILS=['kpap@grnet.gr']):
75
    ...     assert settings.MYAPP_CONTACT_EMAILS == ['kpap@grnet.gr']
76

77
    """
78

    
79
    _prefix = kwargs.get('prefix', '')
80
    prefix = lambda key: '%s%s' % (_prefix, key)
81

    
82
    oldkeys = [k for k in dir(settings) if k.upper() == k]
83
    oldsettings = dict([(k, getattr(settings, k)) for k in oldkeys])
84

    
85
    toremove = []
86
    for key, value in kwargs.iteritems():
87
        key = prefix(key)
88
        if not hasattr(settings, key):
89
            toremove.append(key)
90
        setattr(settings, key, value)
91

    
92
    yield
93

    
94
    # Remove keys that didn't exist
95
    for key in toremove:
96
        delattr(settings, key)
97

    
98
    # Remove keys that added during the execution of the context
99
    if kwargs.get('reset_changes', True):
100
        newkeys = [k for k in dir(settings) if k.upper() == k]
101
        for key in newkeys:
102
            if not key in oldkeys:
103
                delattr(settings, key)
104

    
105
    # Revert old keys
106
    for key in oldkeys:
107
        if key == key.upper():
108
            setattr(settings, key, oldsettings.get(key))
109

    
110

    
111
def with_settings(settings, prefix='', **override):
112
    def wrapper(func):
113
        def inner(*args, **kwargs):
114
            with override_settings(settings, prefix=prefix, **override):
115
                ret = func(*args, **kwargs)
116
            return ret
117
        return inner
118
    return wrapper
119

    
120
serial = 0
121

    
122

    
123
@contextmanager
124
def astakos_user(user):
125
    """
126
    Context manager to mock astakos response.
127

128
    usage:
129
    with astakos_user("user@user.com"):
130
        .... make api calls ....
131

132
    """
133
    with patch("snf_django.lib.api.get_token") as get_token:
134
        get_token.return_value = "DummyToken"
135
        with patch('astakosclient.AstakosClient.authenticate') as m2:
136
            m2.return_value = {"access": {
137
                "token": {
138
                    "expires": "2013-06-19T15:23:59.975572+00:00",
139
                    "id": "DummyToken",
140
                    "tenant": {
141
                        "id": smart_unicode(user, encoding='utf-8'),
142
                        "name": "Firstname Lastname"
143
                        }
144
                    },
145
                "serviceCatalog": [],
146
                "user": {
147
                    "roles_links": [],
148
                    "id": smart_unicode(user, encoding='utf-8'),
149
                    "roles": [{"id": 1, "name": "default"}],
150
                    "name": "Firstname Lastname"}}
151
                }
152

    
153
            with patch('astakosclient.AstakosClient.service_get_quotas') as m2:
154
                m2.return_value = {user: {
155
                    "system": {
156
                        "pithos.diskspace": {
157
                            "usage": 0,
158
                            "limit": 1073741824,  # 1GB
159
                            "pending": 0
160
                            }
161
                        }
162
                    }
163
                }
164
                issue_fun = \
165
                    "astakosclient.AstakosClient.issue_one_commission"
166
                with patch(issue_fun) as m3:
167
                    serials = []
168
                    append = serials.append
169

    
170
                    def get_serial(*args, **kwargs):
171
                        global serial
172
                        serial += 1
173
                        append(serial)
174
                        return serial
175

    
176
                    m3.side_effect = get_serial
177
                    resolv_fun = \
178
                        'astakosclient.AstakosClient.resolve_commissions'
179
                    with patch(resolv_fun) as m4:
180
                        m4.return_value = {'accepted': serials,
181
                                           'rejected': [],
182
                                           'failed': []}
183
                        users_fun = \
184
                            'astakosclient.AstakosClient.get_usernames'
185
                        with patch(users_fun) as m5:
186

    
187
                            def get_usernames(*args, **kwargs):
188
                                uuids = args[-1]
189
                                return dict((uuid, uuid) for uuid in uuids)
190

    
191
                            m5.side_effect = get_usernames
192
                            yield
193

    
194

    
195
serial = 0
196

    
197

    
198
@contextmanager
199
def mocked_quotaholder(success=True):
200
    with patch("synnefo.quotas.Quotaholder.get") as astakos:
201
        global serial
202
        serial += 10
203

    
204
        def foo(*args, **kwargs):
205
            return (len(astakos.return_value.issue_one_commission.mock_calls) +
206
                    serial)
207
        astakos.return_value.issue_one_commission.side_effect = foo
208

    
209
        def resolve_mock(*args, **kwargs):
210
            return {"failed": [],
211
                    "accepted": args[0],
212
                    "rejected": args[1],
213
                    }
214
        astakos.return_value.resolve_commissions.side_effect = resolve_mock
215
        yield astakos.return_value
216

    
217

    
218
class BaseAPITest(TestCase):
219
    def get(self, url, user='user', *args, **kwargs):
220
        with astakos_user(user):
221
            with mocked_quotaholder():
222
                response = self.client.get(url, *args, **kwargs)
223
        return response
224

    
225
    def delete(self, url, user='user'):
226
        with astakos_user(user):
227
            with mocked_quotaholder() as m:
228
                self.mocked_quotaholder = m
229
                response = self.client.delete(url)
230
        return response
231

    
232
    def post(self, url, user='user', params={}, ctype='json', *args, **kwargs):
233
        if ctype == 'json':
234
            content_type = 'application/json'
235
        with astakos_user(user):
236
            with mocked_quotaholder() as m:
237
                self.mocked_quotaholder = m
238
                response = self.client.post(url, params,
239
                                            content_type=content_type,
240
                                            *args, **kwargs)
241
        return response
242

    
243
    def put(self, url, user='user', params={}, ctype='json', *args, **kwargs):
244
        if ctype == 'json':
245
            content_type = 'application/json'
246
        with astakos_user(user):
247
            with mocked_quotaholder() as m:
248
                self.mocked_quotaholder = m
249
                response = self.client.put(url, params,
250
                                           content_type=content_type,
251
                                           *args, **kwargs)
252
        return response
253

    
254
    def assertSuccess(self, response):
255
        self.assertTrue(response.status_code in [200, 202, 203, 204],
256
                        msg=response.content)
257

    
258
    def assertSuccess201(self, response):
259
        self.assertEqual(response.status_code, 201, msg=response.content)
260

    
261
    def assertFault(self, response, status_code, name, msg=''):
262
        self.assertEqual(response.status_code, status_code,
263
                         msg=msg)
264
        fault = json.loads(response.content)
265
        self.assertEqual(fault.keys(), [name])
266

    
267
    def assertBadRequest(self, response):
268
        self.assertFault(response, 400, 'badRequest', msg=response.content)
269

    
270
    def assertConflict(self, response):
271
        self.assertFault(response, 409, 'conflict', msg=response.content)
272

    
273
    def assertItemNotFound(self, response):
274
        self.assertFault(response, 404, 'itemNotFound', msg=response.content)
275

    
276
    def assertMethodNotAllowed(self, response):
277
        self.assertFault(response, 405, 'notAllowed', msg=response.content)
278
        self.assertTrue('Allow' in response)
279
        try:
280
            error = json.loads(response.content)
281
        except ValueError:
282
            self.assertTrue(False)
283
        self.assertEqual(error['notAllowed']['message'], 'Method not allowed')
284

    
285

    
286
# Imitate unittest assertions new in Python 2.7
287

    
288
class _AssertRaisesContext(object):
289
    """
290
    A context manager used to implement TestCase.assertRaises* methods.
291
    Adapted from unittest2.
292
    """
293

    
294
    def __init__(self, expected):
295
        self.expected = expected
296

    
297
    def __enter__(self):
298
        return self
299

    
300
    def __exit__(self, exc_type, exc_value, tb):
301
        if exc_type is None:
302
            try:
303
                exc_name = self.expected.__name__
304
            except AttributeError:
305
                exc_name = str(self.expected)
306
            raise AssertionError(
307
                "%s not raised" % (exc_name,))
308
        if not issubclass(exc_type, self.expected):
309
            # let unexpected exceptions pass through
310
            return False
311
        self.exception = exc_value  # store for later retrieval
312
        return True
313

    
314

    
315
def assertRaises(excClass):
316
    return _AssertRaisesContext(excClass)
317

    
318

    
319
def assertGreater(x, y):
320
    assert x > y
321

    
322

    
323
def assertIn(x, y):
324
    assert x in y