Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / astakos / test.py @ b44a5a37

History | View | Annotate | Download (6.7 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from mock import patch, call
35
from unittest import TestCase
36

    
37
from kamaki.clients import astakos, ClientError
38

    
39

    
40
example = dict(
41
    access=dict(
42
        serviceCatalog=[
43
            dict(name='service name 1', type='compute', endpoints=[
44
                dict(versionId='v1', publicUrl='http://1.1.1.1/v1'),
45
                dict(versionId='v2', publicUrl='http://1.1.1.1/v2')]),
46
            dict(name='service name 2', type='image', endpoints=[
47
                dict(versionId='v2', publicUrl='http://1.1.1.1/v2'),
48
                dict(versionId='v2.1', publicUrl='http://1.1.1.1/v2/xtra')])
49
            ],
50
        user=dict(
51
            name='Simple Name',
52
            username='User Full Name',
53
            auth_token_expires='1362583796000',
54
            auth_token_created='1359991796000',
55
            email=['user@example.gr'],
56
            id=42,
57
            uuid='aus3r-uu1d-f0r-73s71ng-as7ak0s')
58
        )
59
    )
60

    
61
example0 = dict(
62
    access=dict(
63
         token=dict(
64
            expires="2013-07-14T10:07:42.481134+00:00",
65
            id="ast@k0sT0k3n==",
66
            tenant=dict(
67
                id="42",
68
                name="Simple Name 0")
69
        ),
70
        serviceCatalog=[
71
            dict(name='service name 1', type='compute', endpoints=[
72
                dict(versionId='v1', publicUrl='http://1.1.1.1/v1'),
73
                dict(versionId='v2', publicUrl='http://1.1.1.1/v2')]),
74
            dict(name='service name 3', type='object-storage', endpoints=[
75
                dict(versionId='v2', publicUrl='http://1.1.1.1/v2'),
76
                dict(versionId='v2.1', publicUrl='http://1.1.1.1/v2/xtra')])
77
            ],
78
        user=dict(
79
            name='Simple Name 0',
80
            username='User Full Name 0',
81
            auth_token_expires='1362585796000',
82
            auth_token_created='1359931796000',
83
            email=['user0@example.gr'],
84
            id=42,
85
            uuid='aus3r-uu1d-507-73s71ng-as7ak0s')
86
        )
87
    )
88

    
89

    
90
class FR(object):
91
    json = example
92
    headers = {}
93
    content = json
94
    status = None
95
    status_code = 200
96

    
97
astakos_pkg = 'kamaki.clients.astakos.AstakosClient'
98

    
99

    
100
class AstakosClient(TestCase):
101

    
102
    cached = False
103

    
104
    def assert_dicts_are_equal(self, d1, d2):
105
        for k, v in d1.items():
106
            self.assertTrue(k in d2)
107
            if isinstance(v, dict):
108
                self.assert_dicts_are_equal(v, d2[k])
109
            else:
110
                self.assertEqual(unicode(v), unicode(d2[k]))
111

    
112
    def setUp(self):
113
        self.url = 'https://astakos.example.com'
114
        self.token = 'ast@k0sT0k3n=='
115
        self.client = astakos.AstakosClient(self.url, self.token)
116

    
117
    def tearDown(self):
118
        FR.json = example
119

    
120
    @patch('%s.post' % astakos_pkg, return_value=FR())
121
    def _authenticate(self, post):
122
        r = self.client.authenticate()
123
        send_body = dict(auth=dict(token=dict(id=self.token)))
124
        self.assertEqual(post.mock_calls[-1], call('/tokens', json=send_body))
125
        self.cached = True
126
        return r
127

    
128
    def test_authenticate(self):
129
        r = self._authenticate()
130
        self.assert_dicts_are_equal(r, example)
131

    
132
    def test_get_services(self):
133
        if not self.cached:
134
            self._authenticate()
135
        slist = self.client.get_services()
136
        self.assertEqual(slist, example['access']['serviceCatalog'])
137

    
138
    def test_get_service_details(self):
139
        if not self.cached:
140
            self._authenticate()
141
        stype = '#FAIL'
142
        self.assertRaises(ClientError, self.client.get_service_details, stype)
143
        stype = 'compute'
144
        expected = [s for s in example['access']['serviceCatalog'] if (
145
            s['type'] == stype)]
146
        self.assert_dicts_are_equal(
147
            self.client.get_service_details(stype), expected[0])
148

    
149
    def test_get_service_endpoints(self):
150
        if not self.cached:
151
            self._authenticate()
152
        stype, version = 'compute', 'V2'
153
        self.assertRaises(
154
            ClientError, self.client.get_service_endpoints, stype)
155
        expected = [s for s in example['access']['serviceCatalog'] if (
156
            s['type'] == stype)]
157
        expected = [e for e in expected[0]['endpoints'] if (
158
            e['versionId'] == version.lower())]
159
        self.assert_dicts_are_equal(
160
            self.client.get_service_endpoints(stype, version), expected[0])
161

    
162
    def test_user_info(self):
163
        if not self.cached:
164
            self._authenticate()
165
        self.assertTrue(set(example['access']['user'].keys()).issubset(
166
            self.client.user_info().keys()))
167

    
168
    def test_item(self):
169
        if not self.cached:
170
            self._authenticate()
171
        for term, val in example['access']['user'].items():
172
            self.assertEqual(self.client.term(term, self.token), val)
173
        self.assertTrue(
174
            example['access']['user']['email'][0] in self.client.term('email'))
175

    
176
    def test_list_users(self):
177
        if not self.cached:
178
            self._authenticate()
179
        FR.json = example0
180
        self._authenticate()
181
        r = self.client.list_users()
182
        self.assertTrue(len(r) == 1)
183
        self.assertEqual(r[0]['auth_token'], self.token)
184

    
185
if __name__ == '__main__':
186
    from sys import argv
187
    from kamaki.clients.test import runTestCase
188
    runTestCase(AstakosClient, 'AstakosClient', argv[1:])