Statistics
| Branch: | Tag: | Revision:

root / snf-django-lib / snf_django / utils / testing.py @ 14402edc

History | View | Annotate | Download (11.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
from contextlib import contextmanager
36
from django.test import TestCase
37
from django.utils import simplejson as json
38
from synnefo.util import text
39
from mock import patch
40

    
41

    
42
@contextmanager
43
def override_settings(settings, **kwargs):
44
    """
45
    Helper context manager to override django settings within the provided
46
    context.
47

48
    All keyword arguments provided are set to the django settings object and
49
    get reverted/removed when the manager exits.
50

51
    >>> from synnefo.util.testing import override_settings
52
    >>> from django.conf import settings
53
    >>> with override_settings(settings, DEBUG=True):
54
    ...     assert settings.DEBUG == True
55

56
    The special arguemnt ``prefix`` can be set to prefix all setting keys with
57
    the provided value.
58

59
    >>> from django.conf import settings
60
    >>> from django.core import mail
61
    >>> with override_settings(settings, CONTACT_EMAILS=['kpap@grnet.gr'],
62
    ...                        prefix='MYAPP_'):
63
    ...     from django.core.mail import send_mail
64
    ...     send_mail("hello", "I love you kpap", settings.DEFAULT_FROM_EMAIL,
65
    ...               settings.MYAPP_CONTACT_EMAILS)
66
    ...     assert 'kpap@grnet.gr' in mail.mailbox[0].recipients()
67

68
    If you plan to reuse it
69

70
    >>> import functools
71
    >>> from synnefo.util.testing import override_settings
72
    >>> from django.conf import settings
73
    >>> myapp_settings = functools.partial(override_settings, prefix='MYAPP_')
74
    >>> with myapp_settings(CONTACT_EMAILS=['kpap@grnet.gr']):
75
    ...     assert settings.MYAPP_CONTACT_EMAILS == ['kpap@grnet.gr']
76

77
    """
78

    
79
    _prefix = kwargs.get('prefix', '')
80
    prefix = lambda key: '%s%s' % (_prefix, key)
81

    
82
    oldkeys = [k for k in dir(settings) if k.upper() == k]
83
    oldsettings = dict([(k, getattr(settings, k)) for k in oldkeys])
84

    
85
    toremove = []
86
    for key, value in kwargs.iteritems():
87
        key = prefix(key)
88
        if not hasattr(settings, key):
89
            toremove.append(key)
90
        setattr(settings, key, value)
91

    
92
    yield
93

    
94
    # Remove keys that didn't exist
95
    for key in toremove:
96
        delattr(settings, key)
97

    
98
    # Remove keys that added during the execution of the context
99
    if kwargs.get('reset_changes', True):
100
        newkeys = [k for k in dir(settings) if k.upper() == k]
101
        for key in newkeys:
102
            if not key in oldkeys:
103
                delattr(settings, key)
104

    
105
    # Revert old keys
106
    for key in oldkeys:
107
        if key == key.upper():
108
            setattr(settings, key, oldsettings.get(key))
109

    
110

    
111
def with_settings(settings, prefix='', **override):
112
    def wrapper(func):
113
        def inner(*args, **kwargs):
114
            with override_settings(settings, prefix=prefix, **override):
115
                ret = func(*args, **kwargs)
116
            return ret
117
        return inner
118
    return wrapper
119

    
120
serial = 0
121

    
122

    
123
@contextmanager
124
def astakos_user(user):
125
    """
126
    Context manager to mock astakos response.
127

128
    usage:
129
    with astakos_user("user@user.com"):
130
        .... make api calls ....
131

132
    """
133
    with patch("snf_django.lib.api.get_token") as get_token:
134
        get_token.return_value = "DummyToken"
135
        with patch('astakosclient.AstakosClient.authenticate') as m2:
136
            m2.return_value = {"access": {
137
                "token": {
138
                    "expires": "2013-06-19T15:23:59.975572+00:00",
139
                    "id": "DummyToken",
140
                    "tenant": {
141
                        "id": text.udec(user, 'utf8'),
142
                        "name": "Firstname Lastname"
143
                        }
144
                    },
145
                "serviceCatalog": [],
146
                "user": {
147
                    "roles_links": [],
148
                    "id": text.udec(user, 'utf8'),
149
                    "roles": [{"id": 1, "name": "default"}],
150
                    "name": "Firstname Lastname"}}
151
                }
152

    
153
            with patch('astakosclient.AstakosClient.get_quotas') as m3:
154
                m3.return_value = {
155
                    "system": {
156
                        "pithos.diskspace": {
157
                            "usage": 0,
158
                            "limit": 1073741824,  # 1GB
159
                            "pending": 0
160
                        }
161
                    }
162
                }
163
                issue_fun = \
164
                    "astakosclient.AstakosClient.issue_one_commission"
165
                with patch(issue_fun) as m3:
166
                    serials = []
167
                    append = serials.append
168

    
169
                    def get_serial(*args, **kwargs):
170
                        global serial
171
                        serial += 1
172
                        append(serial)
173
                        return serial
174

    
175
                    m3.side_effect = get_serial
176
                    resolv_fun = \
177
                        'astakosclient.AstakosClient.resolve_commissions'
178
                    with patch(resolv_fun) as m4:
179
                        m4.return_value = {'accepted': serials,
180
                                           'rejected': [],
181
                                           'failed': []}
182
                        users_fun = \
183
                            'astakosclient.AstakosClient.get_usernames'
184
                        with patch(users_fun) as m5:
185

    
186
                            def get_usernames(*args, **kwargs):
187
                                uuids = args[-1]
188
                                return dict((uuid, uuid) for uuid in uuids)
189

    
190
                            m5.side_effect = get_usernames
191
                            yield
192

    
193

    
194
serial = 0
195

    
196

    
197
@contextmanager
198
def mocked_quotaholder(success=True):
199
    with patch("synnefo.quotas.Quotaholder.get") as astakos:
200
        global serial
201
        serial += 10
202

    
203
        def foo(*args, **kwargs):
204
            return (len(astakos.return_value.issue_one_commission.mock_calls) +
205
                    serial)
206
        astakos.return_value.issue_one_commission.side_effect = foo
207
        def resolve_mock(*args, **kwargs):
208
            return {"failed": [],
209
                    "accepted": args[0],
210
                    "rejected": args[1],
211
                    }
212
        astakos.return_value.resolve_commissions.side_effect = resolve_mock
213
        yield astakos.return_value
214

    
215

    
216
class BaseAPITest(TestCase):
217
    def get(self, url, user='user', *args, **kwargs):
218
        with astakos_user(user):
219
            with mocked_quotaholder():
220
                response = self.client.get(url, *args, **kwargs)
221
        return response
222

    
223
    def delete(self, url, user='user'):
224
        with astakos_user(user):
225
            with mocked_quotaholder() as m:
226
                self.mocked_quotaholder = m
227
                response = self.client.delete(url)
228
        return response
229

    
230
    def post(self, url, user='user', params={}, ctype='json', *args, **kwargs):
231
        if ctype == 'json':
232
            content_type = 'application/json'
233
        with astakos_user(user):
234
            with mocked_quotaholder() as m:
235
                self.mocked_quotaholder = m
236
                response = self.client.post(url, params,
237
                                            content_type=content_type,
238
                                            *args, **kwargs)
239
        return response
240

    
241
    def put(self, url, user='user', params={}, ctype='json', *args, **kwargs):
242
        if ctype == 'json':
243
            content_type = 'application/json'
244
        with astakos_user(user):
245
            with mocked_quotaholder() as m:
246
                self.mocked_quotaholder = m
247
                response = self.client.put(url, params,
248
                                           content_type=content_type,
249
                                           *args, **kwargs)
250
        return response
251

    
252
    def assertSuccess(self, response):
253
        self.assertTrue(response.status_code in [200, 202, 203, 204],
254
                        msg=response.content)
255

    
256
    def assertSuccess201(self, response):
257
        self.assertEqual(response.status_code, 201, msg=response.content)
258

    
259
    def assertFault(self, response, status_code, name, msg=''):
260
        self.assertEqual(response.status_code, status_code,
261
                         msg=msg)
262
        fault = json.loads(response.content)
263
        self.assertEqual(fault.keys(), [name])
264

    
265
    def assertBadRequest(self, response):
266
        self.assertFault(response, 400, 'badRequest', msg=response.content)
267

    
268
    def assertConflict(self, response):
269
        self.assertFault(response, 409, 'conflict', msg=response.content)
270

    
271
    def assertItemNotFound(self, response):
272
        self.assertFault(response, 404, 'itemNotFound', msg=response.content)
273

    
274
    def assertMethodNotAllowed(self, response):
275
        self.assertFault(response, 405, 'notAllowed', msg=response.content)
276
        self.assertTrue('Allow' in response)
277
        try:
278
            error = json.loads(response.content)
279
        except ValueError:
280
            self.assertTrue(False)
281
        self.assertEqual(error['notAllowed']['message'], 'Method not allowed')
282

    
283

    
284
# Imitate unittest assertions new in Python 2.7
285

    
286
class _AssertRaisesContext(object):
287
    """
288
    A context manager used to implement TestCase.assertRaises* methods.
289
    Adapted from unittest2.
290
    """
291

    
292
    def __init__(self, expected):
293
        self.expected = expected
294

    
295
    def __enter__(self):
296
        return self
297

    
298
    def __exit__(self, exc_type, exc_value, tb):
299
        if exc_type is None:
300
            try:
301
                exc_name = self.expected.__name__
302
            except AttributeError:
303
                exc_name = str(self.expected)
304
            raise AssertionError(
305
                "%s not raised" % (exc_name,))
306
        if not issubclass(exc_type, self.expected):
307
            # let unexpected exceptions pass through
308
            return False
309
        self.exception = exc_value  # store for later retrieval
310
        return True
311

    
312

    
313
def assertRaises(excClass):
314
    return _AssertRaisesContext(excClass)
315

    
316

    
317
def assertGreater(x, y):
318
    assert x > y
319

    
320

    
321
def assertIn(x, y):
322
    assert x in y