Statistics
| Branch: | Tag: | Revision:

root / snf-django-lib / snf_django / utils / testing.py @ f3787696

History | View | Annotate | Download (9.4 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34

    
35
from contextlib import contextmanager
36
from django.test import TestCase
37
from django.utils import simplejson as json
38
from mock import patch
39

    
40

    
41
@contextmanager
42
def override_settings(settings, **kwargs):
43
    """
44
    Helper context manager to override django settings within the provided
45
    context.
46

47
    All keyword arguments provided are set to the django settings object and
48
    get reverted/removed when the manager exits.
49

50
    >>> from synnefo.util.testing import override_settings
51
    >>> from django.conf import settings
52
    >>> with override_settings(settings, DEBUG=True):
53
    >>>     assert settings.DEBUG == True
54

55
    The special arguemnt ``prefix`` can be set to prefix all setting keys with
56
    the provided value.
57

58
    >>> from django.conf import settings
59
    >>> from django.core import mail
60
    >>> with override_settings(settings, CONTACT_EMAILS=['kpap@grnet.gr'],
61
    >>>                        prefix='MYAPP_'):
62
    >>>     from django.core.mail import send_mail
63
    >>>     send_mail("hello", "I love you kpap", settings.DEFAULT_FROM_EMAIL,
64
    >>>               settings.MYAPP_CONTACT_EMAILS)
65
    >>>     assert 'kpap@grnet.gr' in mail.mailbox[0].recipients()
66

67
    If you plan to reuse it
68

69
    >>> import functools
70
    >>> from synnefo.util.testing import override_settings
71
    >>> from django.conf import settings
72
    >>> myapp_settings = functools.partial(override_settings, prefix='MYAPP_')
73
    >>> with myapp_settings(CONTACT_EMAILS=['kpap@grnet.gr'])
74
    >>>     assert settings.MYAPP_CONTACT_EMAILS == ['kpap@grnet.gr']
75

76
    """
77

    
78
    _prefix = kwargs.get('prefix', '')
79
    prefix = lambda key: '%s%s' % (_prefix, key)
80

    
81
    oldkeys = [k for k in dir(settings) if k.upper() == k]
82
    oldsettings = dict([(k, getattr(settings, k)) for k in oldkeys])
83

    
84
    toremove = []
85
    for key, value in kwargs.iteritems():
86
        key = prefix(key)
87
        if not hasattr(settings, key):
88
            toremove.append(key)
89
        setattr(settings, key, value)
90

    
91
    yield
92

    
93
    # Remove keys that didn't exist
94
    for key in toremove:
95
        delattr(settings, key)
96

    
97
    # Remove keys that added during the execution of the context
98
    if kwargs.get('reset_changes', True):
99
        newkeys = [k for k in dir(settings) if k.upper() == k]
100
        for key in newkeys:
101
            if not key in oldkeys:
102
                delattr(settings, key)
103

    
104
    # Revert old keys
105
    for key in oldkeys:
106
        if key == key.upper():
107
            setattr(settings, key, oldsettings.get(key))
108

    
109

    
110
def with_settings(settings, prefix='', **override):
111
    def wrapper(func):
112
        def inner(*args, **kwargs):
113
            with override_settings(settings, prefix=prefix, **override):
114
                ret = func(*args, **kwargs)
115
            return ret
116
        return inner
117
    return wrapper
118

    
119
serial = 0
120

    
121

    
122
@contextmanager
123
def astakos_user(user):
124
    """
125
    Context manager to mock astakos response.
126

127
    usage:
128
    with astakos_user("user@user.com"):
129
        .... make api calls ....
130

131
    """
132
    with patch("snf_django.lib.api.get_token") as get_token:
133
        get_token.return_value = "DummyToken"
134
        with patch('astakosclient.AstakosClient.get_user_info') as m:
135
            m.return_value = {"uuid": user}
136
            with patch('astakosclient.AstakosClient.get_quotas') as m2:
137
                m2.return_value = {
138
                    "system": {
139
                        "pithos.diskspace": {
140
                            "usage": 0,
141
                            "limit": 1073741824,
142
                            "pending": 0
143
                        }
144
                    }
145
                }
146
                with patch('astakosclient.AstakosClient.issue_one_commission') as m3:
147
                    serials = []
148
                    append = serials.append
149

    
150
                    def get_serial(*args, **kwargs):
151
                        global serial
152
                        serial += 1
153
                        append(serial)
154
                        return serial
155

    
156
                    m3.side_effect = get_serial
157
                    with patch('astakosclient.AstakosClient.resolve_commissions') as m4:
158
                        m4.return_value = {'accepted': serials,
159
                                           'rejected': [],
160
                                           'failed': []}
161
                        with patch('astakosclient.AstakosClient.get_usernames') as m5:
162

    
163
                            def get_usernames(*args, **kwargs):
164
                                uuids = args[-1]
165
                                return dict((uuid, uuid) for uuid in uuids)
166

    
167
                            m5.side_effect = get_usernames
168
                            yield
169

    
170

    
171
@contextmanager
172
def mocked_quotaholder(success=True):
173
    with patch("synnefo.quotas.Quotaholder.get") as astakos:
174
        global serial
175
        serial += 1
176
        astakos.return_value.issue_one_commission.return_value = serial
177
        astakos.return_value.resolve_commissions.return_value = {"failed": []}
178
        yield astakos.return_value
179

    
180

    
181
class BaseAPITest(TestCase):
182
    def get(self, url, user='user', *args, **kwargs):
183
        with astakos_user(user):
184
            with mocked_quotaholder():
185
                response = self.client.get(url, *args, **kwargs)
186
        return response
187

    
188
    def delete(self, url, user='user'):
189
        with astakos_user(user):
190
            with mocked_quotaholder():
191
                response = self.client.delete(url)
192
        return response
193

    
194
    def post(self, url, user='user', params={}, ctype='json', *args, **kwargs):
195
        if ctype == 'json':
196
            content_type = 'application/json'
197
        with astakos_user(user):
198
            with mocked_quotaholder():
199
                response = self.client.post(url, params,
200
                                            content_type=content_type,
201
                                            *args, **kwargs)
202
        return response
203

    
204
    def put(self, url, user='user', params={}, ctype='json', *args, **kwargs):
205
        if ctype == 'json':
206
            content_type = 'application/json'
207
        with astakos_user(user):
208
            with mocked_quotaholder():
209
                response = self.client.put(url, params,
210
                                           content_type=content_type,
211
                                           *args, **kwargs)
212
        return response
213

    
214
    def assertSuccess(self, response):
215
        self.assertTrue(response.status_code in [200, 202, 203, 204])
216

    
217
    def assertFault(self, response, status_code, name):
218
        self.assertEqual(response.status_code, status_code)
219
        fault = json.loads(response.content)
220
        self.assertEqual(fault.keys(), [name])
221

    
222
    def assertBadRequest(self, response):
223
        self.assertFault(response, 400, 'badRequest')
224

    
225
    def assertItemNotFound(self, response):
226
        self.assertFault(response, 404, 'itemNotFound')
227

    
228
    def assertMethodNotAllowed(self, response):
229
        self.assertFault(response, 400, 'badRequest')
230
        try:
231
            error = json.loads(response.content)
232
        except ValueError:
233
            self.assertTrue(False)
234
        self.assertEqual(error['badRequest']['message'], 'Method not allowed')
235

    
236

    
237
# Imitate unittest assertions new in Python 2.7
238

    
239
class _AssertRaisesContext(object):
240
    """
241
    A context manager used to implement TestCase.assertRaises* methods.
242
    Adapted from unittest2.
243
    """
244

    
245
    def __init__(self, expected):
246
        self.expected = expected
247

    
248
    def __enter__(self):
249
        return self
250

    
251
    def __exit__(self, exc_type, exc_value, tb):
252
        if exc_type is None:
253
            try:
254
                exc_name = self.expected.__name__
255
            except AttributeError:
256
                exc_name = str(self.expected)
257
            raise AssertionError(
258
                "%s not raised" % (exc_name,))
259
        if not issubclass(exc_type, self.expected):
260
            # let unexpected exceptions pass through
261
            return False
262
        self.exception = exc_value  # store for later retrieval
263
        return True
264

    
265

    
266
def assertRaises(excClass):
267
    return _AssertRaisesContext(excClass)
268

    
269

    
270
def assertGreater(x, y):
271
    assert x > y
272

    
273

    
274
def assertIn(x, y):
275
    assert x in y