Statistics
| Branch: | Tag: | Revision:

root / snf-django-lib / snf_django / utils / testing.py @ 23808592

History | View | Annotate | Download (11.4 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
from contextlib import contextmanager
36
from django.test import TestCase
37
from django.utils import simplejson as json
38
from synnefo.util import text
39
from mock import patch
40

    
41

    
42
@contextmanager
43
def override_settings(settings, **kwargs):
44
    """
45
    Helper context manager to override django settings within the provided
46
    context.
47

48
    All keyword arguments provided are set to the django settings object and
49
    get reverted/removed when the manager exits.
50

51
    >>> from synnefo.util.testing import override_settings
52
    >>> from django.conf import settings
53
    >>> with override_settings(settings, DEBUG=True):
54
    ...     assert settings.DEBUG == True
55

56
    The special arguemnt ``prefix`` can be set to prefix all setting keys with
57
    the provided value.
58

59
    >>> from django.conf import settings
60
    >>> from django.core import mail
61
    >>> with override_settings(settings, CONTACT_EMAILS=['kpap@grnet.gr'],
62
    ...                        prefix='MYAPP_'):
63
    ...     from django.core.mail import send_mail
64
    ...     send_mail("hello", "I love you kpap", settings.DEFAULT_FROM_EMAIL,
65
    ...               settings.MYAPP_CONTACT_EMAILS)
66
    ...     assert 'kpap@grnet.gr' in mail.mailbox[0].recipients()
67

68
    If you plan to reuse it
69

70
    >>> import functools
71
    >>> from synnefo.util.testing import override_settings
72
    >>> from django.conf import settings
73
    >>> myapp_settings = functools.partial(override_settings, prefix='MYAPP_')
74
    >>> with myapp_settings(CONTACT_EMAILS=['kpap@grnet.gr']):
75
    ...     assert settings.MYAPP_CONTACT_EMAILS == ['kpap@grnet.gr']
76

77
    """
78

    
79
    _prefix = kwargs.get('prefix', '')
80
    prefix = lambda key: '%s%s' % (_prefix, key)
81

    
82
    oldkeys = [k for k in dir(settings) if k.upper() == k]
83
    oldsettings = dict([(k, getattr(settings, k)) for k in oldkeys])
84

    
85
    toremove = []
86
    for key, value in kwargs.iteritems():
87
        key = prefix(key)
88
        if not hasattr(settings, key):
89
            toremove.append(key)
90
        setattr(settings, key, value)
91

    
92
    yield
93

    
94
    # Remove keys that didn't exist
95
    for key in toremove:
96
        delattr(settings, key)
97

    
98
    # Remove keys that added during the execution of the context
99
    if kwargs.get('reset_changes', True):
100
        newkeys = [k for k in dir(settings) if k.upper() == k]
101
        for key in newkeys:
102
            if not key in oldkeys:
103
                delattr(settings, key)
104

    
105
    # Revert old keys
106
    for key in oldkeys:
107
        if key == key.upper():
108
            setattr(settings, key, oldsettings.get(key))
109

    
110

    
111
def with_settings(settings, prefix='', **override):
112
    def wrapper(func):
113
        def inner(*args, **kwargs):
114
            with override_settings(settings, prefix=prefix, **override):
115
                ret = func(*args, **kwargs)
116
            return ret
117
        return inner
118
    return wrapper
119

    
120
serial = 0
121

    
122

    
123
@contextmanager
124
def astakos_user(user):
125
    """
126
    Context manager to mock astakos response.
127

128
    usage:
129
    with astakos_user("user@user.com"):
130
        .... make api calls ....
131

132
    """
133
    with patch("snf_django.lib.api.get_token") as get_token:
134
        get_token.return_value = "DummyToken"
135
        with patch('astakosclient.AstakosClient.authenticate') as m2:
136
            m2.return_value = {"access": {
137
                "token": {
138
                    "expires": "2013-06-19T15:23:59.975572+00:00",
139
                    "id": "DummyToken",
140
                    "tenant": {
141
                        "id": text.udec(user, 'utf8'),
142
                        "name": "Firstname Lastname"
143
                        }
144
                    },
145
                "serviceCatalog": [],
146
                "user": {
147
                    "roles_links": [],
148
                    "id": text.udec(user, 'utf8'),
149
                    "roles": [{"id": 1, "name": "default"}],
150
                    "name": "Firstname Lastname"}}
151
                }
152

    
153
            with patch('astakosclient.AstakosClient.service_get_quotas') as m2:
154
                m2.return_value = {user: {
155
                    "system": {
156
                        "pithos.diskspace": {
157
                            "usage": 0,
158
                            "limit": 1073741824,  # 1GB
159
                            "pending": 0
160
                            }
161
                        }
162
                    }
163
                }
164
                issue_fun = \
165
                    "astakosclient.AstakosClient.issue_one_commission"
166
                with patch(issue_fun) as m3:
167
                    serials = []
168
                    append = serials.append
169

    
170
                    def get_serial(*args, **kwargs):
171
                        global serial
172
                        serial += 1
173
                        append(serial)
174
                        return serial
175

    
176
                    m3.side_effect = get_serial
177
                    resolv_fun = \
178
                        'astakosclient.AstakosClient.resolve_commissions'
179
                    with patch(resolv_fun) as m4:
180
                        m4.return_value = {'accepted': serials,
181
                                           'rejected': [],
182
                                           'failed': []}
183
                        users_fun = \
184
                            'astakosclient.AstakosClient.get_usernames'
185
                        with patch(users_fun) as m5:
186

    
187
                            def get_usernames(*args, **kwargs):
188
                                uuids = args[-1]
189
                                return dict((uuid, uuid) for uuid in uuids)
190

    
191
                            m5.side_effect = get_usernames
192
                            yield
193

    
194

    
195
serial = 0
196

    
197

    
198
@contextmanager
199
def mocked_quotaholder(success=True):
200
    with patch("synnefo.quotas.Quotaholder.get") as astakos:
201
        global serial
202
        serial += 10
203

    
204
        def foo(*args, **kwargs):
205
            return (len(astakos.return_value.issue_one_commission.mock_calls) +
206
                    serial)
207
        astakos.return_value.issue_one_commission.side_effect = foo
208
        def resolve_mock(*args, **kwargs):
209
            return {"failed": [],
210
                    "accepted": args[0],
211
                    "rejected": args[1],
212
                    }
213
        astakos.return_value.resolve_commissions.side_effect = resolve_mock
214
        yield astakos.return_value
215

    
216

    
217
class BaseAPITest(TestCase):
218
    def get(self, url, user='user', *args, **kwargs):
219
        with astakos_user(user):
220
            with mocked_quotaholder():
221
                response = self.client.get(url, *args, **kwargs)
222
        return response
223

    
224
    def head(self, url, user='user', *args, **kwargs):
225
        with astakos_user(user):
226
            with mocked_quotaholder():
227
                response = self.client.head(url, *args, **kwargs)
228
        return response
229

    
230
    def delete(self, url, user='user'):
231
        with astakos_user(user):
232
            with mocked_quotaholder() as m:
233
                self.mocked_quotaholder = m
234
                response = self.client.delete(url)
235
        return response
236

    
237
    def post(self, url, user='user', params={}, ctype='json', *args, **kwargs):
238
        if ctype == 'json':
239
            content_type = 'application/json'
240
        with astakos_user(user):
241
            with mocked_quotaholder() as m:
242
                self.mocked_quotaholder = m
243
                response = self.client.post(url, params,
244
                                            content_type=content_type,
245
                                            *args, **kwargs)
246
        return response
247

    
248
    def put(self, url, user='user', params={}, ctype='json', *args, **kwargs):
249
        if ctype == 'json':
250
            content_type = 'application/json'
251
        with astakos_user(user):
252
            with mocked_quotaholder() as m:
253
                self.mocked_quotaholder = m
254
                response = self.client.put(url, params,
255
                                           content_type=content_type,
256
                                           *args, **kwargs)
257
        return response
258

    
259
    def assertSuccess(self, response):
260
        self.assertTrue(response.status_code in [200, 202, 203, 204],
261
                        msg=response.content)
262

    
263
    def assertSuccess201(self, response):
264
        self.assertEqual(response.status_code, 201, msg=response.content)
265

    
266
    def assertFault(self, response, status_code, name, msg=''):
267
        self.assertEqual(response.status_code, status_code,
268
                         msg=msg)
269
        fault = json.loads(response.content)
270
        self.assertEqual(fault.keys(), [name])
271

    
272
    def assertBadRequest(self, response):
273
        self.assertFault(response, 400, 'badRequest', msg=response.content)
274

    
275
    def assertConflict(self, response):
276
        self.assertFault(response, 409, 'conflict', msg=response.content)
277

    
278
    def assertItemNotFound(self, response):
279
        self.assertFault(response, 404, 'itemNotFound', msg=response.content)
280

    
281
    def assertMethodNotAllowed(self, response):
282
        self.assertFault(response, 405, 'notAllowed', msg=response.content)
283
        self.assertTrue('Allow' in response)
284
        try:
285
            error = json.loads(response.content)
286
        except ValueError:
287
            self.assertTrue(False)
288
        self.assertEqual(error['notAllowed']['message'], 'Method not allowed')
289

    
290

    
291
# Imitate unittest assertions new in Python 2.7
292

    
293
class _AssertRaisesContext(object):
294
    """
295
    A context manager used to implement TestCase.assertRaises* methods.
296
    Adapted from unittest2.
297
    """
298

    
299
    def __init__(self, expected):
300
        self.expected = expected
301

    
302
    def __enter__(self):
303
        return self
304

    
305
    def __exit__(self, exc_type, exc_value, tb):
306
        if exc_type is None:
307
            try:
308
                exc_name = self.expected.__name__
309
            except AttributeError:
310
                exc_name = str(self.expected)
311
            raise AssertionError(
312
                "%s not raised" % (exc_name,))
313
        if not issubclass(exc_type, self.expected):
314
            # let unexpected exceptions pass through
315
            return False
316
        self.exception = exc_value  # store for later retrieval
317
        return True
318

    
319

    
320
def assertRaises(excClass):
321
    return _AssertRaisesContext(excClass)
322

    
323

    
324
def assertGreater(x, y):
325
    assert x > y
326

    
327

    
328
def assertIn(x, y):
329
    assert x in y