Statistics
| Branch: | Tag: | Revision:

root / snf-django-lib / snf_django / utils / testing.py @ 34bd6588

History | View | Annotate | Download (9.5 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
from contextlib import contextmanager
36
from django.test import TestCase
37
from django.utils import simplejson as json
38
from synnefo.util import text
39
from mock import patch
40

    
41

    
42
@contextmanager
43
def override_settings(settings, **kwargs):
44
    """
45
    Helper context manager to override django settings within the provided
46
    context.
47

48
    All keyword arguments provided are set to the django settings object and
49
    get reverted/removed when the manager exits.
50

51
    >>> from synnefo.util.testing import override_settings
52
    >>> from django.conf import settings
53
    >>> with override_settings(settings, DEBUG=True):
54
    >>>     assert settings.DEBUG == True
55

56
    The special arguemnt ``prefix`` can be set to prefix all setting keys with
57
    the provided value.
58

59
    >>> from django.conf import settings
60
    >>> from django.core import mail
61
    >>> with override_settings(settings, CONTACT_EMAILS=['kpap@grnet.gr'],
62
    >>>                        prefix='MYAPP_'):
63
    >>>     from django.core.mail import send_mail
64
    >>>     send_mail("hello", "I love you kpap", settings.DEFAULT_FROM_EMAIL,
65
    >>>               settings.MYAPP_CONTACT_EMAILS)
66
    >>>     assert 'kpap@grnet.gr' in mail.mailbox[0].recipients()
67

68
    If you plan to reuse it
69

70
    >>> import functools
71
    >>> from synnefo.util.testing import override_settings
72
    >>> from django.conf import settings
73
    >>> myapp_settings = functools.partial(override_settings, prefix='MYAPP_')
74
    >>> with myapp_settings(CONTACT_EMAILS=['kpap@grnet.gr'])
75
    >>>     assert settings.MYAPP_CONTACT_EMAILS == ['kpap@grnet.gr']
76

77
    """
78

    
79
    _prefix = kwargs.get('prefix', '')
80
    prefix = lambda key: '%s%s' % (_prefix, key)
81

    
82
    oldkeys = [k for k in dir(settings) if k.upper() == k]
83
    oldsettings = dict([(k, getattr(settings, k)) for k in oldkeys])
84

    
85
    toremove = []
86
    for key, value in kwargs.iteritems():
87
        key = prefix(key)
88
        if not hasattr(settings, key):
89
            toremove.append(key)
90
        setattr(settings, key, value)
91

    
92
    yield
93

    
94
    # Remove keys that didn't exist
95
    for key in toremove:
96
        delattr(settings, key)
97

    
98
    # Remove keys that added during the execution of the context
99
    if kwargs.get('reset_changes', True):
100
        newkeys = [k for k in dir(settings) if k.upper() == k]
101
        for key in newkeys:
102
            if not key in oldkeys:
103
                delattr(settings, key)
104

    
105
    # Revert old keys
106
    for key in oldkeys:
107
        if key == key.upper():
108
            setattr(settings, key, oldsettings.get(key))
109

    
110

    
111
def with_settings(settings, prefix='', **override):
112
    def wrapper(func):
113
        def inner(*args, **kwargs):
114
            with override_settings(settings, prefix=prefix, **override):
115
                ret = func(*args, **kwargs)
116
            return ret
117
        return inner
118
    return wrapper
119

    
120

    
121
@contextmanager
122
def astakos_user(user):
123
    """
124
    Context manager to mock astakos response.
125

126
    usage:
127
    with astakos_user("user@user.com"):
128
        .... make api calls ....
129

130
    """
131
    with patch("snf_django.lib.api.get_token") as get_token:
132
        get_token.return_value = "DummyToken"
133
        with patch('astakosclient.AstakosClient.get_user_info') as m:
134
            m.return_value = {"uuid": text.uenc(user, 'utf8')}
135
            with patch('astakosclient.AstakosClient.get_quotas') as m2:
136
                m2.return_value = {
137
                    "system": {
138
                        "pithos.diskspace": {
139
                            "usage": 0,
140
                            "limit": 1073741824,  # 1GB
141
                            "pending": 0
142
                        }
143
                    }
144
                }
145
                issue_fun = "astakosclient.AstakosClient.issue_one_commission"
146
                with patch(issue_fun) as m3:
147
                    serials = []
148
                    append = serials.append
149

    
150
                    def get_serial(*args, **kwargs):
151
                        global serial
152
                        serial += 1
153
                        append(serial)
154
                        return serial
155

    
156
                    m3.side_effect = get_serial
157
                    resolv_fun = \
158
                        'astakosclient.AstakosClient.resolve_commissions'
159
                    with patch(resolv_fun) as m4:
160
                        m4.return_value = {'accepted': serials,
161
                                           'rejected': [],
162
                                           'failed': []}
163
                        users_fun = 'astakosclient.AstakosClient.get_usernames'
164
                        with patch(users_fun) as m5:
165

    
166
                            def get_usernames(*args, **kwargs):
167
                                uuids = args[-1]
168
                                return dict((uuid, uuid) for uuid in uuids)
169

    
170
                            m5.side_effect = get_usernames
171
                            yield
172

    
173

    
174
serial = 0
175

    
176

    
177
@contextmanager
178
def mocked_quotaholder(success=True):
179
    with patch("synnefo.quotas.Quotaholder.get") as astakos:
180
        global serial
181
        serial += 10
182

    
183
        def foo(*args, **kwargs):
184
            return (len(astakos.return_value.issue_one_commission.mock_calls) +
185
                    serial)
186
        astakos.return_value.issue_one_commission.side_effect = foo
187
        astakos.return_value.resolve_commissions.return_value = {"failed": []}
188
        yield
189

    
190

    
191
class BaseAPITest(TestCase):
192
    def get(self, url, user='user', *args, **kwargs):
193
        with astakos_user(user):
194
            response = self.client.get(url, *args, **kwargs)
195
        return response
196

    
197
    def delete(self, url, user='user'):
198
        with astakos_user(user):
199
            response = self.client.delete(url)
200
        return response
201

    
202
    def post(self, url, user='user', params={}, ctype='json', *args, **kwargs):
203
        if ctype == 'json':
204
            content_type = 'application/json'
205
        with astakos_user(user):
206
            response = self.client.post(url, params, content_type=content_type,
207
                                        *args, **kwargs)
208
        return response
209

    
210
    def put(self, url, user='user', params={}, ctype='json', *args, **kwargs):
211
        if ctype == 'json':
212
            content_type = 'application/json'
213
        with astakos_user(user):
214
            response = self.client.put(url, params, content_type=content_type,
215
                                       *args, **kwargs)
216
        return response
217

    
218
    def assertSuccess(self, response):
219
        self.assertTrue(response.status_code in [200, 203, 204])
220

    
221
    def assertFault(self, response, status_code, name):
222
        self.assertEqual(response.status_code, status_code)
223
        fault = json.loads(response.content)
224
        self.assertEqual(fault.keys(), [name])
225

    
226
    def assertBadRequest(self, response):
227
        self.assertFault(response, 400, 'badRequest')
228

    
229
    def assertItemNotFound(self, response):
230
        self.assertFault(response, 404, 'itemNotFound')
231

    
232
    def assertMethodNotAllowed(self, response):
233
        self.assertFault(response, 400, 'badRequest')
234
        try:
235
            error = json.loads(response.content)
236
        except ValueError:
237
            self.assertTrue(False)
238
        self.assertEqual(error['badRequest']['message'], 'Method not allowed')
239

    
240

    
241
# Imitate unittest assertions new in Python 2.7
242

    
243
class _AssertRaisesContext(object):
244
    """
245
    A context manager used to implement TestCase.assertRaises* methods.
246
    Adapted from unittest2.
247
    """
248

    
249
    def __init__(self, expected):
250
        self.expected = expected
251

    
252
    def __enter__(self):
253
        return self
254

    
255
    def __exit__(self, exc_type, exc_value, tb):
256
        if exc_type is None:
257
            try:
258
                exc_name = self.expected.__name__
259
            except AttributeError:
260
                exc_name = str(self.expected)
261
            raise AssertionError(
262
                "%s not raised" % (exc_name,))
263
        if not issubclass(exc_type, self.expected):
264
            # let unexpected exceptions pass through
265
            return False
266
        self.exception = exc_value  # store for later retrieval
267
        return True
268

    
269

    
270
def assertRaises(excClass):
271
    return _AssertRaisesContext(excClass)
272

    
273

    
274
def assertGreater(x, y):
275
    assert x > y
276

    
277

    
278
def assertIn(x, y):
279
    assert x in y