root / kamaki / cli / utils / test.py @ 36fa6ffb
History | View | Annotate | Download (20.5 kB)
1 |
# Copyright 2013 GRNET S.A. All rights reserved.
|
---|---|
2 |
#
|
3 |
# Redistribution and use in source and binary forms, with or
|
4 |
# without modification, are permitted provided that the following
|
5 |
# conditions are met:
|
6 |
#
|
7 |
# 1. Redistributions of source code must retain the above
|
8 |
# copyright notice, this list of conditions and the following
|
9 |
# disclaimer.
|
10 |
#
|
11 |
# 2. Redistributions in binary form must reproduce the above
|
12 |
# copyright notice, this list of conditions and the following
|
13 |
# disclaimer in the documentation and/or other materials
|
14 |
# provided with the distribution.
|
15 |
#
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
# POSSIBILITY OF SUCH DAMAGE.
|
28 |
#
|
29 |
# The views and conclusions contained in the software and
|
30 |
# documentation are those of the authors and should not be
|
31 |
# interpreted as representing official policies, either expressed
|
32 |
# or implied, of GRNET S.A.
|
33 |
|
34 |
from unittest import TestCase |
35 |
from tempfile import NamedTemporaryFile |
36 |
from mock import patch, call |
37 |
from itertools import product |
38 |
from io import StringIO |
39 |
|
40 |
|
41 |
class UtilsMethods(TestCase): |
42 |
|
43 |
def assert_dicts_are_equal(self, d1, d2): |
44 |
for k, v in d1.items(): |
45 |
self.assertTrue(k in d2) |
46 |
if isinstance(v, dict): |
47 |
self.assert_dicts_are_equal(v, d2[k])
|
48 |
else:
|
49 |
self.assertEqual(unicode(v), unicode(d2[k])) |
50 |
|
51 |
def test_guess_mime_type(self): |
52 |
from kamaki.cli.utils import guess_mime_type |
53 |
from mimetypes import guess_type |
54 |
for args in product( |
55 |
('file.txt', 'file.png', 'file.zip', 'file.gz', None, 'X'), |
56 |
('a type', None), |
57 |
('an/encoding', None)): |
58 |
filename, ctype, cencoding = args |
59 |
if filename:
|
60 |
exp_type, exp_enc = guess_type(filename) |
61 |
self.assertEqual(
|
62 |
guess_mime_type(*args), |
63 |
(exp_type or ctype, exp_enc or cencoding)) |
64 |
else:
|
65 |
self.assertRaises(AssertionError, guess_mime_type, *args) |
66 |
|
67 |
@patch('kamaki.cli.utils.dumps', return_value='(dumps output)') |
68 |
def test_print_json(self, JD): |
69 |
from kamaki.cli.utils import print_json, INDENT_TAB |
70 |
out = StringIO() |
71 |
print_json('some data', out)
|
72 |
JD.assert_called_once_with('some data', indent=INDENT_TAB)
|
73 |
self.assertEqual(out.getvalue(), u'(dumps output)\n') |
74 |
|
75 |
def test_print_dict(self): |
76 |
from kamaki.cli.utils import print_dict, INDENT_TAB |
77 |
out = StringIO() |
78 |
self.assertRaises(AssertionError, print_dict, 'non-dict think') |
79 |
self.assertRaises(AssertionError, print_dict, {}, indent=-10) |
80 |
for args in product( |
81 |
( |
82 |
{'k1': 'v1'}, |
83 |
{'k1': 'v1', 'k2': 'v2'}, |
84 |
{'k1': 'v1', 'k2': 'v2', 'k3': 'v3'}, |
85 |
{'k1': 'v1', 'k2': {'k1': 'v1', 'k2': 'v2'}, 'k3': 'v3'}, |
86 |
{ |
87 |
'k1': {'k1': 'v1', 'k2': 'v2'}, |
88 |
'k2': [1, 2, 3], |
89 |
'k3': 'v3'}, |
90 |
{ |
91 |
'k1': {'k1': 'v1', 'k2': 'v2'}, |
92 |
'k2': 42, |
93 |
'k3': {'k1': 1, 'k2': [1, 2, 3]}}, |
94 |
{ |
95 |
'k1': {
|
96 |
'k1': 'v1', |
97 |
'k2': [1, 2, 3], |
98 |
'k3': {'k1': [(1, 2)]}}, |
99 |
'k2': (3, 4, 5), |
100 |
'k3': {'k1': 1, 'k2': [1, 2, 3]}}), |
101 |
(tuple(), ('k1', ), ('k1', 'k2')), |
102 |
(0, 1, 2, 9), (False, True), (False, True)): |
103 |
d, exclude, indent, with_enumeration, recursive_enumeration = args |
104 |
with patch('kamaki.cli.utils.print_dict') as PD: |
105 |
with patch('kamaki.cli.utils.print_list') as PL: |
106 |
pd_calls, pl_calls = 0, 0 |
107 |
print_dict(*args, out=out) |
108 |
exp_calls = u''
|
109 |
for i, (k, v) in enumerate(d.items()): |
110 |
if k in exclude: |
111 |
continue
|
112 |
str_k = u' ' * indent
|
113 |
str_k += u'%s.' % (i + 1) if with_enumeration else u'' |
114 |
str_k += u'%s:' % k
|
115 |
if isinstance(v, dict): |
116 |
self.assertEqual(
|
117 |
PD.mock_calls[pd_calls], |
118 |
call( |
119 |
v, |
120 |
exclude, |
121 |
indent + INDENT_TAB, |
122 |
recursive_enumeration, |
123 |
recursive_enumeration, |
124 |
out)) |
125 |
pd_calls += 1
|
126 |
exp_calls += str_k + '\n'
|
127 |
elif isinstance(v, list) or isinstance(v, tuple): |
128 |
self.assertEqual(
|
129 |
PL.mock_calls[pl_calls], |
130 |
call( |
131 |
v, |
132 |
exclude, |
133 |
indent + INDENT_TAB, |
134 |
recursive_enumeration, |
135 |
recursive_enumeration, |
136 |
out)) |
137 |
pl_calls += 1
|
138 |
exp_calls += str_k + '\n'
|
139 |
else:
|
140 |
exp_calls += u'%s %s\n' % (str_k, v)
|
141 |
self.assertEqual(exp_calls, out.getvalue())
|
142 |
out = StringIO() |
143 |
|
144 |
def test_print_list(self): |
145 |
from kamaki.cli.utils import print_list, INDENT_TAB |
146 |
out = StringIO() |
147 |
self.assertRaises(AssertionError, print_list, 'non-list non-tuple') |
148 |
self.assertRaises(AssertionError, print_list, {}, indent=-10) |
149 |
for args in product( |
150 |
( |
151 |
['v1', ],
|
152 |
('v2', 'v3'), |
153 |
[1, '2', 'v3'], |
154 |
({'k1': 'v1'}, 2, 'v3'), |
155 |
[(1, 2), 'v2', [(3, 4), {'k3': [5, 6], 'k4': 7}]]), |
156 |
(tuple(), ('v1', ), ('v1', 1), ('v1', 'k3')), |
157 |
(0, 1, 2, 9), (False, True), (False, True)): |
158 |
l, exclude, indent, with_enumeration, recursive_enumeration = args |
159 |
with patch('kamaki.cli.utils.print_dict') as PD: |
160 |
with patch('kamaki.cli.utils.print_list') as PL: |
161 |
pd_calls, pl_calls = 0, 0 |
162 |
print_list(*args, out=out) |
163 |
exp_calls = ''
|
164 |
for i, v in enumerate(l): |
165 |
str_v = u' ' * indent
|
166 |
str_v += u'%s.' % (i + 1) if with_enumeration else u'' |
167 |
if isinstance(v, dict): |
168 |
if with_enumeration:
|
169 |
exp_calls += str_v + '\n'
|
170 |
elif i and i < len(l): |
171 |
exp_calls += u'\n'
|
172 |
self.assertEqual(
|
173 |
PD.mock_calls[pd_calls], |
174 |
call( |
175 |
v, |
176 |
exclude, |
177 |
indent + ( |
178 |
INDENT_TAB if with_enumeration else 0), |
179 |
recursive_enumeration, |
180 |
recursive_enumeration, |
181 |
out)) |
182 |
pd_calls += 1
|
183 |
elif isinstance(v, list) or isinstance(v, tuple): |
184 |
if with_enumeration:
|
185 |
exp_calls += str_v + '\n'
|
186 |
elif i and i < len(l): |
187 |
exp_calls += u'\n'
|
188 |
self.assertEqual(
|
189 |
PL.mock_calls[pl_calls], |
190 |
call( |
191 |
v, |
192 |
exclude, |
193 |
indent + INDENT_TAB, |
194 |
recursive_enumeration, |
195 |
recursive_enumeration, |
196 |
out)) |
197 |
pl_calls += 1
|
198 |
elif ('%s' % v) in exclude: |
199 |
continue
|
200 |
else:
|
201 |
exp_calls += u'%s%s\n' % (str_v, v)
|
202 |
self.assertEqual(out.getvalue(), exp_calls)
|
203 |
out = StringIO() |
204 |
|
205 |
@patch('kamaki.cli.utils.print_dict') |
206 |
@patch('kamaki.cli.utils.print_list') |
207 |
@patch('kamaki.cli.utils.bold', return_value='bold') |
208 |
def test_print_items(self, bold, PL, PD): |
209 |
from kamaki.cli.utils import print_items, INDENT_TAB |
210 |
for args in product( |
211 |
( |
212 |
42, None, 'simple outputs', |
213 |
[1, 2, 3], {1: 1, 2: 2}, (3, 4), |
214 |
({'k': 1, 'id': 2}, [5, 6, 7], (8, 9), '10')), |
215 |
(('id', 'name'), ('something', 2), ('lala', )), |
216 |
(False, True), |
217 |
(False, True)): |
218 |
items, title, with_enumeration, with_redundancy = args |
219 |
pl_counter, pd_counter = len(PL.mock_calls), len(PD.mock_calls) |
220 |
bold_counter, out_counter = len(bold.mock_calls), 0 |
221 |
out = StringIO() |
222 |
print_items(*args, out=out) |
223 |
out.seek(0)
|
224 |
if not (isinstance(items, dict) or isinstance( |
225 |
items, list) or isinstance(items, tuple)): |
226 |
if items:
|
227 |
self.assertEqual(out.getvalue(), '%s\n' % items) |
228 |
else:
|
229 |
for i, item in enumerate(items): |
230 |
if with_enumeration:
|
231 |
exp_str = '%s. ' % (i + 1) |
232 |
self.assertEqual(out.read(len(exp_str)), exp_str) |
233 |
if isinstance(item, dict): |
234 |
title = sorted(set(title).intersection(item)) |
235 |
pick = item.get if with_redundancy else item.pop |
236 |
header = ' '.join('%s' % pick(key) for key in title) |
237 |
if header:
|
238 |
self.assertEqual(
|
239 |
bold.mock_calls[bold_counter], call(header)) |
240 |
self.assertEqual(out.read(5), 'bold\n') |
241 |
bold_counter += 1
|
242 |
else:
|
243 |
out.read(1)
|
244 |
self.assertEqual(
|
245 |
PD.mock_calls[pd_counter], |
246 |
call(item, indent=INDENT_TAB, out=out)) |
247 |
pd_counter += 1
|
248 |
elif isinstance(item, list) or isinstance(item, tuple): |
249 |
self.assertEqual(
|
250 |
PL.mock_calls[pl_counter], |
251 |
call(item, indent=INDENT_TAB, out=out)) |
252 |
pl_counter += 1
|
253 |
else:
|
254 |
exp_str = u' %s\n' % item
|
255 |
self.assertEqual(out.read(len(exp_str)), exp_str) |
256 |
|
257 |
def test_format_size(self): |
258 |
from kamaki.cli.utils import format_size |
259 |
from kamaki.cli import CLIError |
260 |
for v in ('wrong', {1: '1', 2: '2'}, ('tuples', 'not OK'), [1, 2]): |
261 |
self.assertRaises(CLIError, format_size, v)
|
262 |
for step, B, K, M, G, T in ( |
263 |
(1000, 'B', 'KB', 'MB', 'GB', 'TB'), |
264 |
(1024, 'B', 'KiB', 'MiB', 'GiB', 'TiB')): |
265 |
Ki, Mi, Gi = step, step * step, step * step * step |
266 |
for before, after in ( |
267 |
(0, '0' + B), (512, '512' + B), ( |
268 |
Ki - 1, '%s%s' % (step - 1, B)), |
269 |
(Ki, '1' + K), (42 * Ki, '42' + K), ( |
270 |
Mi - 1, '%s.99%s' % (step - 1, K)), |
271 |
(Mi, '1' + M), (42 * Mi, '42' + M), ( |
272 |
Ki * Mi - 1, '%s.99%s' % (step - 1, M)), |
273 |
(Gi, '1' + G), (42 * Gi, '42' + G), ( |
274 |
Mi * Mi - 1, '%s.99%s' % (step - 1, G)), |
275 |
(Mi * Mi, '1' + T), (42 * Mi * Mi, '42' + T), ( |
276 |
Mi * Gi - 1, '%s.99%s' % (step - 1, T)), ( |
277 |
42 * Mi * Gi, '%s%s' % (42 * Ki, T))): |
278 |
self.assertEqual(format_size(before, step == 1000), after) |
279 |
|
280 |
def test_to_bytes(self): |
281 |
from kamaki.cli.utils import to_bytes |
282 |
for v in ('wrong', 'KABUM', 'kbps', 'kibps'): |
283 |
self.assertRaises(ValueError, to_bytes, v, 'B') |
284 |
self.assertRaises(ValueError, to_bytes, 42, v) |
285 |
for v in ([1, 2, 3], ('kb', 'mb'), {'kb': 1, 'byte': 2}): |
286 |
self.assertRaises(TypeError, to_bytes, v, 'B') |
287 |
self.assertRaises(AttributeError, to_bytes, 42, v) |
288 |
kl, ki = 1000, 1024 |
289 |
for size, (unit, factor) in product( |
290 |
(0, 42, 3.14, 1023, 10000), |
291 |
( |
292 |
('B', 1), ('b', 1), |
293 |
('KB', kl), ('KiB', ki), |
294 |
('mb', kl * kl), ('mIb', ki * ki), |
295 |
('gB', kl * kl * kl), ('GIB', ki * ki * ki), |
296 |
('TB', kl * kl * kl * kl), ('tiB', ki * ki * ki * ki))): |
297 |
self.assertEqual(to_bytes(size, unit), int(size * factor)) |
298 |
|
299 |
def test_dict2file(self): |
300 |
from kamaki.cli.utils import dict2file, INDENT_TAB |
301 |
for d, depth in product(( |
302 |
{'k': 42}, |
303 |
{'k1': 'v1', 'k2': [1, 2, 3], 'k3': {'k': 'v'}}, |
304 |
{'k1': {
|
305 |
'k1.1': 'v1.1', |
306 |
'k1.2': [1, 2, 3], |
307 |
'k1.3': {'k': 'v'}}}), |
308 |
(-42, 0, 42)): |
309 |
exp = ''
|
310 |
exp_d = [] |
311 |
exp_l = [] |
312 |
exp, exp_d, exp_l = '', [], []
|
313 |
with NamedTemporaryFile() as f: |
314 |
for k, v in d.items(): |
315 |
sfx = '\n'
|
316 |
if isinstance(v, dict): |
317 |
exp_d.append(call(v, f, depth + 1))
|
318 |
elif isinstance(v, tuple) or isinstance(v, list): |
319 |
exp_l.append(call(v, f, depth + 1))
|
320 |
else:
|
321 |
sfx = '%s\n' % v
|
322 |
exp += '%s%s: %s' % (
|
323 |
' ' * (depth * INDENT_TAB), k, sfx)
|
324 |
with patch('kamaki.cli.utils.dict2file') as D2F: |
325 |
with patch('kamaki.cli.utils.list2file') as L2F: |
326 |
dict2file(d, f, depth) |
327 |
f.seek(0)
|
328 |
self.assertEqual(f.read(), exp)
|
329 |
self.assertEqual(L2F.mock_calls, exp_l)
|
330 |
self.assertEqual(D2F.mock_calls, exp_d)
|
331 |
|
332 |
def test_list2file(self): |
333 |
from kamaki.cli.utils import list2file, INDENT_TAB |
334 |
for l, depth in product( |
335 |
( |
336 |
(1, 2, 3), |
337 |
[1, 2, 3], |
338 |
('v', [1, 2, 3], (1, 2, 3), {'1': 1, 2: '2', 3: 3}), |
339 |
['v', {'k1': 'v1', 'k2': [1, 2, 3], 'k3': {1: '1'}}]), |
340 |
(-42, 0, 42)): |
341 |
with NamedTemporaryFile() as f: |
342 |
exp, exp_d, exp_l = '', [], []
|
343 |
for v in l: |
344 |
if isinstance(v, dict): |
345 |
exp_d.append(call(v, f, depth + 1))
|
346 |
elif isinstance(v, list) or isinstance(v, tuple): |
347 |
exp_l.append(call(v, f, depth + 1))
|
348 |
else:
|
349 |
exp += '%s%s\n' % (' ' * INDENT_TAB * depth, v) |
350 |
with patch('kamaki.cli.utils.dict2file') as D2F: |
351 |
with patch('kamaki.cli.utils.list2file') as L2F: |
352 |
list2file(l, f, depth) |
353 |
f.seek(0)
|
354 |
self.assertEqual(f.read(), exp)
|
355 |
self.assertEqual(L2F.mock_calls, exp_l)
|
356 |
self.assertEqual(D2F.mock_calls, exp_d)
|
357 |
|
358 |
def test__parse_with_regex(self): |
359 |
from re import compile as r_compile |
360 |
from kamaki.cli.utils import _parse_with_regex |
361 |
for args in product( |
362 |
( |
363 |
'this is a line',
|
364 |
'this_is_also_a_line',
|
365 |
'This "text" is quoted',
|
366 |
'This "quoted" "text" is more "complicated"',
|
367 |
'Is this \'quoted\' text "double \'quoted\' or not?"',
|
368 |
'"What \'about\' the" oposite?',
|
369 |
' Try with a " single double quote',
|
370 |
'Go "down \'deep " deeper \'bottom \' up" go\' up" !'),
|
371 |
( |
372 |
'\'.*?\'|".*?"|^[\S]*$',
|
373 |
r'"([A-Za-z0-9_\./\\-]*)"',
|
374 |
r'\"(.+?)\"',
|
375 |
'\\^a\\.\\*\\$')):
|
376 |
r_parser = r_compile(args[1])
|
377 |
self.assertEqual(
|
378 |
_parse_with_regex(*args), |
379 |
(r_parser.split(args[0]), r_parser.findall(args[0]))) |
380 |
|
381 |
def test_split_input(self): |
382 |
from kamaki.cli.utils import split_input |
383 |
for line, expected in ( |
384 |
('set key="v1"', ['set', 'key=v1']), |
385 |
('unparsable', ['unparsable']), |
386 |
('"parsable"', ['parsable']), |
387 |
('"parse" out', ['parse', 'out']), |
388 |
('"one', ['"one']), |
389 |
('two" or" more"', ['two or', 'more"']), |
390 |
('Go "down \'deep " deeper \'bottom \' up" go\' up" !', [
|
391 |
'Go', "down 'deep ", 'deeper', 'bottom ', |
392 |
'up go\' up', '!']), |
393 |
('Is "this" a \'parsed\' string?', [
|
394 |
'Is', 'this', 'a', 'parsed', 'string?'])): |
395 |
self.assertEqual(split_input(line), expected)
|
396 |
|
397 |
def test_ask_user(self): |
398 |
from kamaki.cli.utils import ask_user |
399 |
msg = u'some question'
|
400 |
out = StringIO() |
401 |
user_in = StringIO(u'n')
|
402 |
self.assertFalse(ask_user(msg, out=out, user_in=user_in))
|
403 |
self.assertEqual(out.getvalue(), u'%s [y/N]: ' % msg) |
404 |
|
405 |
user_in.seek(0)
|
406 |
out.seek(0)
|
407 |
self.assertTrue(ask_user(msg, ('n', ), out=out, user_in=user_in)) |
408 |
self.assertEqual(out.getvalue(), u'%s [n/<not n>]: ' % msg) |
409 |
|
410 |
user_in = StringIO(unicode('N')) |
411 |
out.seek(0)
|
412 |
self.assertTrue(ask_user(msg, ('r', 'N'), out=out, user_in=user_in)) |
413 |
self.assertEqual(out.getvalue(), u'%s [r, N/<not r, N>]: ' % msg) |
414 |
|
415 |
def test_remove_from_items(self): |
416 |
from kamaki.cli.utils import remove_from_items |
417 |
for v in ('wrong', [1, 2, 3], [{}, 2, {}]): |
418 |
self.assertRaises(AssertionError, remove_from_items, v, 'none') |
419 |
d = dict(k1=1, k2=dict(k2=2, k3=3), k3=3, k4=4) |
420 |
for k in (d.keys() + ['kN']): |
421 |
tmp1, tmp2 = dict(d), dict(d) |
422 |
remove_from_items([tmp1, ], k) |
423 |
tmp1.pop(k, None)
|
424 |
self.assert_dicts_are_equal(tmp1, tmp2)
|
425 |
for k in (d.keys() + ['kN']): |
426 |
tmp1, tmp2 = dict(d), dict(d) |
427 |
remove_from_items([tmp1, tmp2], k) |
428 |
self.assert_dicts_are_equal(tmp1, tmp2)
|
429 |
|
430 |
def test_filter_dicts_by_dict(self): |
431 |
from kamaki.cli.utils import filter_dicts_by_dict |
432 |
|
433 |
dlist = [ |
434 |
dict(k1='v1', k2='v2', k3='v3'), |
435 |
dict(k1='v1'), |
436 |
dict(k2='v2', k3='v3'), |
437 |
dict(k1='V1', k3='V3'), |
438 |
dict()]
|
439 |
for l, f, em, cs, exp in ( |
440 |
(dlist, dlist[2], True, False, dlist[0:1] + dlist[2:3]), |
441 |
(dlist, dlist[1], True, False, dlist[0:2] + dlist[3:4]), |
442 |
(dlist, dlist[1], True, True, dlist[0:2]), |
443 |
(dlist, {'k3': 'v'}, True, False, []), |
444 |
(dlist, {'k3': 'v'}, False, False, dlist[0:1] + dlist[2:4]), |
445 |
(dlist, {'k3': 'v'}, False, True, dlist[0:1] + dlist[2:3]), |
446 |
(dlist, {'k3': 'v'}, True, True, []), |
447 |
(dlist, dlist[4], True, False, dlist), |
448 |
): |
449 |
self.assertEqual(exp, filter_dicts_by_dict(l, f, em, cs))
|
450 |
|
451 |
|
452 |
if __name__ == '__main__': |
453 |
from sys import argv |
454 |
from kamaki.cli.test import runTestCase |
455 |
runTestCase(UtilsMethods, 'UtilsMethods', argv[1:]) |