root / kamaki / cli / config / test.py @ 2e616dbd
History | View | Annotate | Download (14.2 kB)
1 |
# Copyright 2013 GRNET S.A. All rights reserved.
|
---|---|
2 |
#
|
3 |
# Redistribution and use in source and binary forms, with or
|
4 |
# without modification, are permitted provided that the following
|
5 |
# conditions are met:
|
6 |
#
|
7 |
# 1. Redistributions of source code must retain the above
|
8 |
# copyright notice, this list of conditions and the following
|
9 |
# disclaimer.
|
10 |
#
|
11 |
# 2. Redistributions in binary form must reproduce the above
|
12 |
# copyright notice, this list of conditions and the following
|
13 |
# disclaimer in the documentation and/or other materials
|
14 |
# provided with the distribution.
|
15 |
#
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
# POSSIBILITY OF SUCH DAMAGE.
|
28 |
#
|
29 |
# The views and conclusions contained in the software and
|
30 |
# documentation are those of the authors and should not be
|
31 |
# interpreted as representing official policies, either expressed
|
32 |
# or implied, of GRNET S.A.
|
33 |
|
34 |
from mock import patch, call |
35 |
from unittest import TestCase |
36 |
from itertools import product |
37 |
import os |
38 |
from tempfile import NamedTemporaryFile |
39 |
from io import StringIO |
40 |
|
41 |
|
42 |
def _2steps_gen(limit=2): |
43 |
counter, ret = 0, None |
44 |
while True: |
45 |
if counter >= limit:
|
46 |
ret = None if ret else 'something' |
47 |
counter = 0
|
48 |
counter += 1
|
49 |
yield ret
|
50 |
|
51 |
_2value_gen = _2steps_gen() |
52 |
|
53 |
|
54 |
class Config(TestCase): |
55 |
"""Test Config methods"""
|
56 |
|
57 |
config_file_content = [ |
58 |
'#kamaki config file version 0.9\n',
|
59 |
'[global]\n',
|
60 |
'max_threads = 5\n',
|
61 |
'default_cloud = ~mycloud\n',
|
62 |
'file_cli = pithos\n',
|
63 |
'history_file = /home/user/.kamaki.history\n',
|
64 |
'colors = off\n',
|
65 |
'config_cli = config\n',
|
66 |
'history_cli = history\n',
|
67 |
'log_token = off\n',
|
68 |
'server_cli = cyclades\n',
|
69 |
'user_cli = astakos\n',
|
70 |
'log_data = off\n',
|
71 |
'flavor_cli = cyclades\n',
|
72 |
'image_cli = image\n',
|
73 |
'log_file = /home/user/.kamaki.log\n',
|
74 |
'network_cli = cyclades\n',
|
75 |
'log_pid = off\n',
|
76 |
'\n',
|
77 |
'[cloud "demo"]\n',
|
78 |
'url = https://demo.example.com\n',
|
79 |
'token = t0k3n-0f-d3m0-3x4mp13\n',
|
80 |
'\n',
|
81 |
'[cloud "~mycloud"]\n',
|
82 |
'url = https://example.com\n',
|
83 |
'pithos_container = images\n']
|
84 |
|
85 |
def setUp(self): |
86 |
self.f = NamedTemporaryFile()
|
87 |
|
88 |
def readDown(self): |
89 |
try:
|
90 |
self.f.close()
|
91 |
except Exception: |
92 |
pass
|
93 |
|
94 |
@patch('kamaki.cli.config.Config.remove_section') |
95 |
@patch('kamaki.cli.config.Config.items', return_value=( |
96 |
('k1', 'v1'), ('k2', 'v2'))) |
97 |
@patch('kamaki.cli.config.Config.sections', return_value=('a', 'b')) |
98 |
@patch('kamaki.cli.config.Config.set_cloud') |
99 |
@patch('kamaki.cli.config.Config.read') |
100 |
@patch('kamaki.cli.config.Config._load_defaults') |
101 |
def test___init__( |
102 |
self, _ld, c_read, c_set_cloud, c_sections, c_items,
|
103 |
c_remove_section): |
104 |
from kamaki.cli.config import ( |
105 |
Config, RawConfigParser, CONFIG_ENV, CONFIG_PATH) |
106 |
_ld_num, c_sections_num, c_items_num, c_set_cloud_num = 0, 0, 0, 0 |
107 |
c_remove_section_num, gen_call = 0, [call('a'), call('b')] |
108 |
for path, with_defaults in product((None, '/a/path'), (True, False)): |
109 |
with patch(
|
110 |
'kamaki.cli.config.Config._cloud_name',
|
111 |
return_value=_2value_gen.next()) as _cloud_name:
|
112 |
cnf = Config(path=path, with_defaults=with_defaults) |
113 |
self.assertTrue(isinstance(cnf, RawConfigParser)) |
114 |
cpath = path or os.environ.get(CONFIG_ENV, CONFIG_PATH)
|
115 |
self.assertEqual(cnf.path, cpath)
|
116 |
if with_defaults:
|
117 |
_ld_num += 1
|
118 |
self.assertEqual(_ld.mock_calls[-1], call()) |
119 |
self.assertEqual(len(_ld.mock_calls), _ld_num) |
120 |
self.assertEqual(c_read.mock_calls[-1], call(cpath)) |
121 |
|
122 |
c_sections_num += 1
|
123 |
self.assertEqual(len(c_sections.mock_calls), c_sections_num) |
124 |
self.assertEqual(c_sections.mock_calls[-1], call()) |
125 |
|
126 |
self.assertEqual(_cloud_name.mock_calls, gen_call)
|
127 |
|
128 |
r = _2value_gen.next() |
129 |
if r:
|
130 |
c_items_num += 2
|
131 |
self.assertEqual(c_items.mock_calls[-2:], gen_call) |
132 |
c_set_cloud_num += 4
|
133 |
self.assertEqual(c_set_cloud.mock_calls[-4:], [ |
134 |
call(r, 'k1', 'v1'), call(r, 'k2', 'v2')] * 2) |
135 |
c_remove_section_num += 2
|
136 |
self.assertEqual(
|
137 |
c_remove_section.mock_calls[-2:], gen_call)
|
138 |
self.assertEqual(len(c_items.mock_calls), c_items_num) |
139 |
self.assertEqual(len(c_set_cloud.mock_calls), c_set_cloud_num) |
140 |
self.assertEqual(
|
141 |
len(c_remove_section.mock_calls), c_remove_section_num)
|
142 |
|
143 |
def test__cloud_name(self): |
144 |
from kamaki.cli.config import ( |
145 |
Config, CLOUD_PREFIX, InvalidCloudNameError) |
146 |
cn = Config._cloud_name |
147 |
self.assertEqual(cn('non%s name' % CLOUD_PREFIX), None) |
148 |
for invalid in ('"!@#$%^&())_"', '"a b c"', u'"\xce\xcd"', 'naked'): |
149 |
self.assertRaises(
|
150 |
InvalidCloudNameError, cn, '%s %s' % (CLOUD_PREFIX, invalid))
|
151 |
for valid in ('word', '~okeanos', 'd0t.ted', 'ha$h#ed'): |
152 |
self.assertEqual(cn('%s "%s"' % (CLOUD_PREFIX, valid)), valid) |
153 |
|
154 |
def test_rescue_old_file(self): |
155 |
from kamaki.cli.config import Config |
156 |
|
157 |
content0 = list(self.config_file_content) |
158 |
|
159 |
def make_file(lines): |
160 |
f = NamedTemporaryFile() |
161 |
f.writelines(lines) |
162 |
f.flush() |
163 |
return f
|
164 |
|
165 |
with make_file(content0) as f: |
166 |
_cnf = Config(path=f.name) |
167 |
self.assertEqual([], _cnf.rescue_old_file())
|
168 |
del _cnf
|
169 |
|
170 |
content1, sample = list(content0), 'xyz_cli = XYZ_specs' |
171 |
content1.insert(2, '%s\n' % sample) |
172 |
|
173 |
with make_file(content1) as f: |
174 |
_cnf = Config(path=f.name) |
175 |
self.assertEqual(['global.%s' % sample], _cnf.rescue_old_file()) |
176 |
del _cnf
|
177 |
|
178 |
content2, sample = list(content0), 'http://www.example2.org' |
179 |
content2.insert(2, 'url = %s\n' % sample) |
180 |
err = StringIO() |
181 |
|
182 |
with make_file(content2) as f: |
183 |
_cnf = Config(path=f.name) |
184 |
self.assertEqual([], _cnf.rescue_old_file(err=err))
|
185 |
self.assertEqual(
|
186 |
'... rescue global.url => cloud.default.url\n', err.getvalue())
|
187 |
self.assertEqual(sample, _cnf.get_cloud('default', 'url')) |
188 |
del _cnf
|
189 |
|
190 |
content3 = list(content0)
|
191 |
content3.insert( |
192 |
2, 'url = http://example1.com\nurl = http://example2.com\n') |
193 |
|
194 |
with make_file(content3) as f: |
195 |
_cnf = Config(path=f.name) |
196 |
self.assertEqual([], _cnf.rescue_old_file(err=err))
|
197 |
self.assertEqual(
|
198 |
2 * '... rescue global.url => cloud.default.url\n', |
199 |
err.getvalue()) |
200 |
self.assertEqual(
|
201 |
'http://example2.com', _cnf.get_cloud('default', 'url')) |
202 |
del _cnf
|
203 |
|
204 |
content4 = list(content0)
|
205 |
content4.insert(2, 'url = http://example1.com\n') |
206 |
content4.append('\n[cloud "default"]\nurl=http://example2.com\n')
|
207 |
|
208 |
with make_file(content4) as f: |
209 |
_cnf = Config(path=f.name) |
210 |
from kamaki.cli.errors import CLISyntaxError |
211 |
self.assertRaises(CLISyntaxError, _cnf.rescue_old_file)
|
212 |
del _cnf
|
213 |
|
214 |
content5 = list(content0)
|
215 |
extras = [ |
216 |
('pithos_cli', 'pithos'), ('store_cli', 'pithos'), |
217 |
('storage_cli', 'pithos'), ('compute_cli', 'cyclades'), |
218 |
('cyclades_cli', 'cyclades')] |
219 |
for sample in extras: |
220 |
content5.insert(2, '%s = %s\n' % sample) |
221 |
|
222 |
with make_file(content5) as f: |
223 |
_cnf = Config(path=f.name) |
224 |
self.assertEqual(
|
225 |
sorted(['global.%s = %s' % sample for sample in extras]), |
226 |
sorted(_cnf.rescue_old_file()))
|
227 |
|
228 |
def test_guess_version(self): |
229 |
from kamaki.cli.config import Config |
230 |
from kamaki.cli.logger import add_file_logger |
231 |
|
232 |
def make_log_file(): |
233 |
f = NamedTemporaryFile() |
234 |
add_file_logger('kamaki.cli.config', filename=f.name)
|
235 |
return f
|
236 |
|
237 |
def make_file(lines): |
238 |
f = NamedTemporaryFile() |
239 |
f.writelines(lines) |
240 |
f.flush() |
241 |
return f
|
242 |
|
243 |
with make_file([]) as f: |
244 |
with make_log_file() as logf: |
245 |
_cnf = Config(path=f.name) |
246 |
self.assertEqual(0.9, _cnf.guess_version()) |
247 |
exp = 'All heuristics failed, cannot decide\n'
|
248 |
logf.file.seek(- len(exp), 2) |
249 |
self.assertEqual(exp, logf.read())
|
250 |
|
251 |
content0 = list(self.config_file_content) |
252 |
|
253 |
with make_file(content0) as f: |
254 |
with make_log_file() as logf: |
255 |
_cnf = Config(path=f.name) |
256 |
self.assertEqual(0.9, _cnf.guess_version()) |
257 |
exp = '... found cloud "demo"\n'
|
258 |
logf.seek(- len(exp), 2) |
259 |
self.assertEqual(exp, logf.read())
|
260 |
|
261 |
for term in ('url', 'token'): |
262 |
content1 = list(content0)
|
263 |
content1.insert(2, '%s = some_value' % term) |
264 |
|
265 |
with make_file(content1) as f: |
266 |
with make_log_file() as logf: |
267 |
_cnf = Config(path=f.name) |
268 |
self.assertEqual(0.8, _cnf.guess_version()) |
269 |
exp = '..... config file has an old global section\n'
|
270 |
logf.seek(- len(exp), 2) |
271 |
self.assertEqual(exp, logf.read())
|
272 |
|
273 |
def test_get_cloud(self): |
274 |
from kamaki.cli.config import Config, CLOUD_PREFIX |
275 |
|
276 |
_cnf = Config(path=self.f.name)
|
277 |
d = dict(opt1='v1', opt2='v2') |
278 |
with patch('kamaki.cli.config.Config.get', return_value=d) as get: |
279 |
self.assertEqual('v1', _cnf.get_cloud('mycloud', 'opt1')) |
280 |
self.assertEqual(
|
281 |
get.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud')) |
282 |
self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt3') |
283 |
with patch('kamaki.cli.config.Config.get', return_value=0) as get: |
284 |
self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt1') |
285 |
|
286 |
def test_get_global(self): |
287 |
from kamaki.cli.config import Config |
288 |
|
289 |
_cnf = Config(path=self.f.name)
|
290 |
with patch('kamaki.cli.config.Config.get', return_value='val') as get: |
291 |
self.assertEqual('val', _cnf.get_global('opt')) |
292 |
get.assert_called_once_with('global', 'opt') |
293 |
|
294 |
@patch('kamaki.cli.config.Config.set') |
295 |
def test_set_cloud(self, c_set): |
296 |
from kamaki.cli.config import Config, CLOUD_PREFIX |
297 |
_cnf = Config(path=self.f.name)
|
298 |
|
299 |
d = dict(k='v') |
300 |
with patch('kamaki.cli.config.Config.get', return_value=d) as get: |
301 |
_cnf.set_cloud('mycloud', 'opt', 'val') |
302 |
get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
|
303 |
d['opt'] = 'val' |
304 |
self.assertEqual(
|
305 |
c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d)) |
306 |
|
307 |
with patch('kamaki.cli.config.Config.get', return_value=None) as get: |
308 |
_cnf.set_cloud('mycloud', 'opt', 'val') |
309 |
get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
|
310 |
d = dict(opt='val') |
311 |
self.assertEqual(
|
312 |
c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d)) |
313 |
|
314 |
with patch(
|
315 |
'kamaki.cli.config.Config.get', side_effect=KeyError()) as get: |
316 |
_cnf.set_cloud('mycloud', 'opt', 'val') |
317 |
get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
|
318 |
d = dict(opt='val') |
319 |
self.assertEqual(
|
320 |
c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d)) |
321 |
|
322 |
def test_set_global(self): |
323 |
from kamaki.cli.config import Config |
324 |
_cnf = Config(path=self.f.name)
|
325 |
|
326 |
with patch('kamaki.cli.config.Config.set') as c_set: |
327 |
_cnf.set_global('opt', 'val') |
328 |
c_set.assert_called_once_with('global', 'opt', 'val') |
329 |
|
330 |
def test__load_defaults(self): |
331 |
from kamaki.cli.config import Config, DEFAULTS |
332 |
_cnf = Config(path=self.f.name)
|
333 |
|
334 |
with patch('kamaki.cli.config.Config.set') as c_set: |
335 |
_cnf._load_defaults() |
336 |
for i, (section, options) in enumerate(DEFAULTS.items()): |
337 |
for j, (option, val) in enumerate(options.items()): |
338 |
self.assertEqual(
|
339 |
c_set.mock_calls[(i + 1) * j],
|
340 |
call(section, option, val)) |
341 |
|
342 |
def test__get_dict(self): |
343 |
from kamaki.cli.config import Config, CLOUD_PREFIX, DEFAULTS |
344 |
|
345 |
def make_file(lines): |
346 |
f = NamedTemporaryFile() |
347 |
f.writelines(lines) |
348 |
f.flush() |
349 |
return f
|
350 |
|
351 |
with make_file([]) as f: |
352 |
_cnf = Config(path=f.name) |
353 |
for term in ('global', CLOUD_PREFIX): |
354 |
self.assertEqual(DEFAULTS[term], _cnf._get_dict(term))
|
355 |
for term in ('nosection', ''): |
356 |
self.assertEqual({}, _cnf._get_dict(term))
|
357 |
|
358 |
with make_file(self.config_file_content) as f: |
359 |
_cnf = Config(path=f.name) |
360 |
for term in ('global', CLOUD_PREFIX): |
361 |
self.assertNotEqual(DEFAULTS[term], _cnf._get_dict(term))
|
362 |
|
363 |
def test_reload(self): |
364 |
from kamaki.cli.config import Config |
365 |
_cnf = Config(path=self.f.name)
|
366 |
|
367 |
with patch('kamaki.cli.config.Config.__init__') as i: |
368 |
_cnf.reload() |
369 |
i.assert_called_once_with(self.f.name)
|
370 |
|
371 |
|
372 |
if __name__ == '__main__': |
373 |
from sys import argv |
374 |
from kamaki.cli.test import runTestCase |
375 |
runTestCase(Config, 'Config', argv[1:]) |