Statistics
| Branch: | Tag: | Revision:

root / snf-django-lib / snf_django / utils / testing.py @ fde2c1f7

History | View | Annotate | Download (10 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
from contextlib import contextmanager
36
from django.test import TestCase
37
from django.utils import simplejson as json
38
from synnefo.util import text
39
from mock import patch
40

    
41

    
42
@contextmanager
43
def override_settings(settings, **kwargs):
44
    """
45
    Helper context manager to override django settings within the provided
46
    context.
47

48
    All keyword arguments provided are set to the django settings object and
49
    get reverted/removed when the manager exits.
50

51
    >>> from synnefo.util.testing import override_settings
52
    >>> from django.conf import settings
53
    >>> with override_settings(settings, DEBUG=True):
54
    >>>     assert settings.DEBUG == True
55

56
    The special arguemnt ``prefix`` can be set to prefix all setting keys with
57
    the provided value.
58

59
    >>> from django.conf import settings
60
    >>> from django.core import mail
61
    >>> with override_settings(settings, CONTACT_EMAILS=['kpap@grnet.gr'],
62
    >>>                        prefix='MYAPP_'):
63
    >>>     from django.core.mail import send_mail
64
    >>>     send_mail("hello", "I love you kpap", settings.DEFAULT_FROM_EMAIL,
65
    >>>               settings.MYAPP_CONTACT_EMAILS)
66
    >>>     assert 'kpap@grnet.gr' in mail.mailbox[0].recipients()
67

68
    If you plan to reuse it
69

70
    >>> import functools
71
    >>> from synnefo.util.testing import override_settings
72
    >>> from django.conf import settings
73
    >>> myapp_settings = functools.partial(override_settings, prefix='MYAPP_')
74
    >>> with myapp_settings(CONTACT_EMAILS=['kpap@grnet.gr'])
75
    >>>     assert settings.MYAPP_CONTACT_EMAILS == ['kpap@grnet.gr']
76

77
    """
78

    
79
    _prefix = kwargs.get('prefix', '')
80
    prefix = lambda key: '%s%s' % (_prefix, key)
81

    
82
    oldkeys = [k for k in dir(settings) if k.upper() == k]
83
    oldsettings = dict([(k, getattr(settings, k)) for k in oldkeys])
84

    
85
    toremove = []
86
    for key, value in kwargs.iteritems():
87
        key = prefix(key)
88
        if not hasattr(settings, key):
89
            toremove.append(key)
90
        setattr(settings, key, value)
91

    
92
    yield
93

    
94
    # Remove keys that didn't exist
95
    for key in toremove:
96
        delattr(settings, key)
97

    
98
    # Remove keys that added during the execution of the context
99
    if kwargs.get('reset_changes', True):
100
        newkeys = [k for k in dir(settings) if k.upper() == k]
101
        for key in newkeys:
102
            if not key in oldkeys:
103
                delattr(settings, key)
104

    
105
    # Revert old keys
106
    for key in oldkeys:
107
        if key == key.upper():
108
            setattr(settings, key, oldsettings.get(key))
109

    
110

    
111
def with_settings(settings, prefix='', **override):
112
    def wrapper(func):
113
        def inner(*args, **kwargs):
114
            with override_settings(settings, prefix=prefix, **override):
115
                ret = func(*args, **kwargs)
116
            return ret
117
        return inner
118
    return wrapper
119

    
120
serial = 0
121

    
122

    
123
@contextmanager
124
def astakos_user(user):
125
    """
126
    Context manager to mock astakos response.
127

128
    usage:
129
    with astakos_user("user@user.com"):
130
        .... make api calls ....
131

132
    """
133
    with patch("snf_django.lib.api.get_token") as get_token:
134
        get_token.return_value = "DummyToken"
135
        with patch('astakosclient.AstakosClient.get_user_info') as m:
136
            m.return_value = {"uuid": text.udec(user, 'utf8')}
137
            with patch('astakosclient.AstakosClient.get_quotas') as m2:
138
                m2.return_value = {
139
                    "system": {
140
                        "pithos.diskspace": {
141
                            "usage": 0,
142
                            "limit": 1073741824,  # 1GB
143
                            "pending": 0
144
                        }
145
                    }
146
                }
147
                issue_fun = "astakosclient.AstakosClient.issue_one_commission"
148
                with patch(issue_fun) as m3:
149
                    serials = []
150
                    append = serials.append
151

    
152
                    def get_serial(*args, **kwargs):
153
                        global serial
154
                        serial += 1
155
                        append(serial)
156
                        return serial
157

    
158
                    m3.side_effect = get_serial
159
                    resolv_fun = \
160
                        'astakosclient.AstakosClient.resolve_commissions'
161
                    with patch(resolv_fun) as m4:
162
                        m4.return_value = {'accepted': serials,
163
                                           'rejected': [],
164
                                           'failed': []}
165
                        users_fun = 'astakosclient.AstakosClient.get_usernames'
166
                        with patch(users_fun) as m5:
167

    
168
                            def get_usernames(*args, **kwargs):
169
                                uuids = args[-1]
170
                                return dict((uuid, uuid) for uuid in uuids)
171

    
172
                            m5.side_effect = get_usernames
173
                            yield
174

    
175

    
176
serial = 0
177

    
178

    
179
@contextmanager
180
def mocked_quotaholder(success=True):
181
    with patch("synnefo.quotas.Quotaholder.get") as astakos:
182
        global serial
183
        serial += 10
184

    
185
        def foo(*args, **kwargs):
186
            return (len(astakos.return_value.issue_one_commission.mock_calls) +
187
                    serial)
188
        astakos.return_value.issue_one_commission.side_effect = foo
189
        astakos.return_value.resolve_commissions.return_value = {"failed": []}
190
        yield astakos.return_value
191

    
192

    
193
class BaseAPITest(TestCase):
194
    def get(self, url, user='user', *args, **kwargs):
195
        with astakos_user(user):
196
            with mocked_quotaholder():
197
                response = self.client.get(url, *args, **kwargs)
198
        return response
199

    
200
    def delete(self, url, user='user'):
201
        with astakos_user(user):
202
            with mocked_quotaholder() as m:
203
                self.mocked_quotaholder = m
204
                response = self.client.delete(url)
205
        return response
206

    
207
    def post(self, url, user='user', params={}, ctype='json', *args, **kwargs):
208
        if ctype == 'json':
209
            content_type = 'application/json'
210
        with astakos_user(user):
211
            with mocked_quotaholder() as m:
212
                self.mocked_quotaholder = m
213
                response = self.client.post(url, params,
214
                                            content_type=content_type,
215
                                            *args, **kwargs)
216
        return response
217

    
218
    def put(self, url, user='user', params={}, ctype='json', *args, **kwargs):
219
        if ctype == 'json':
220
            content_type = 'application/json'
221
        with astakos_user(user):
222
            with mocked_quotaholder() as m:
223
                self.mocked_quotaholder = m
224
                response = self.client.put(url, params,
225
                                           content_type=content_type,
226
                                           *args, **kwargs)
227
        return response
228

    
229
    def assertSuccess(self, response):
230
        self.assertTrue(response.status_code in [200, 202, 203, 204])
231

    
232
    def assertFault(self, response, status_code, name):
233
        self.assertEqual(response.status_code, status_code)
234
        fault = json.loads(response.content)
235
        self.assertEqual(fault.keys(), [name])
236

    
237
    def assertBadRequest(self, response):
238
        self.assertFault(response, 400, 'badRequest')
239

    
240
    def assertConflict(self, response):
241
        self.assertFault(response, 409, 'conflict')
242

    
243
    def assertItemNotFound(self, response):
244
        self.assertFault(response, 404, 'itemNotFound')
245

    
246
    def assertMethodNotAllowed(self, response):
247
        self.assertFault(response, 400, 'badRequest')
248
        try:
249
            error = json.loads(response.content)
250
        except ValueError:
251
            self.assertTrue(False)
252
        self.assertEqual(error['badRequest']['message'], 'Method not allowed')
253

    
254

    
255
# Imitate unittest assertions new in Python 2.7
256

    
257
class _AssertRaisesContext(object):
258
    """
259
    A context manager used to implement TestCase.assertRaises* methods.
260
    Adapted from unittest2.
261
    """
262

    
263
    def __init__(self, expected):
264
        self.expected = expected
265

    
266
    def __enter__(self):
267
        return self
268

    
269
    def __exit__(self, exc_type, exc_value, tb):
270
        if exc_type is None:
271
            try:
272
                exc_name = self.expected.__name__
273
            except AttributeError:
274
                exc_name = str(self.expected)
275
            raise AssertionError(
276
                "%s not raised" % (exc_name,))
277
        if not issubclass(exc_type, self.expected):
278
            # let unexpected exceptions pass through
279
            return False
280
        self.exception = exc_value  # store for later retrieval
281
        return True
282

    
283

    
284
def assertRaises(excClass):
285
    return _AssertRaisesContext(excClass)
286

    
287

    
288
def assertGreater(x, y):
289
    assert x > y
290

    
291

    
292
def assertIn(x, y):
293
    assert x in y