Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / utils / test.py @ 854222c7

History | View | Annotate | Download (22.7 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from unittest import TestCase
35
from tempfile import NamedTemporaryFile
36
from mock import patch, call
37
from itertools import product
38

    
39

    
40
class UtilsMethods(TestCase):
41

    
42
    def assert_dicts_are_equal(self, d1, d2):
43
        for k, v in d1.items():
44
            self.assertTrue(k in d2)
45
            if isinstance(v, dict):
46
                self.assert_dicts_are_equal(v, d2[k])
47
            else:
48
                self.assertEqual(unicode(v), unicode(d2[k]))
49

    
50
    def test_guess_mime_type(self):
51
        from kamaki.cli.utils import guess_mime_type
52
        from mimetypes import guess_type
53
        for args in product(
54
                ('file.txt', 'file.png', 'file.zip', 'file.gz', None, 'X'),
55
                ('a type', None),
56
                ('an/encoding', None)):
57
            filename, ctype, cencoding = args
58
            if filename:
59
                exp_type, exp_enc = guess_type(filename)
60
                self.assertEqual(
61
                    guess_mime_type(*args),
62
                    (exp_type or ctype, exp_enc or cencoding))
63
            else:
64
                self.assertRaises(AssertionError, guess_mime_type, *args)
65

    
66
    @patch('kamaki.cli.utils.dumps', return_value='(dumps output)')
67
    @patch('kamaki.cli.utils._print')
68
    def test_print_json(self, PR, JD):
69
        from kamaki.cli.utils import print_json, INDENT_TAB
70
        print_json('some data')
71
        JD.assert_called_once_with('some data', indent=INDENT_TAB)
72
        PR.assert_called_once_with('(dumps output)')
73

    
74
    @patch('kamaki.cli.utils._print')
75
    def test_print_dict(self, PR):
76
        from kamaki.cli.utils import print_dict, INDENT_TAB
77
        call_counter = 0
78
        self.assertRaises(AssertionError, print_dict, 'non-dict think')
79
        self.assertRaises(AssertionError, print_dict, {}, indent=-10)
80
        for args in product(
81
                (
82
                    {'k1': 'v1'},
83
                    {'k1': 'v1', 'k2': 'v2'},
84
                    {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'},
85
                    {'k1': 'v1', 'k2': {'k1': 'v1', 'k2': 'v2'}, 'k3': 'v3'},
86
                    {
87
                        'k1': {'k1': 'v1', 'k2': 'v2'},
88
                        'k2': [1, 2, 3],
89
                        'k3': 'v3'},
90
                    {
91
                        'k1': {'k1': 'v1', 'k2': 'v2'},
92
                        'k2': 42,
93
                        'k3': {'k1': 1, 'k2': [1, 2, 3]}},
94
                    {
95
                        'k1': {
96
                            'k1': 'v1',
97
                            'k2': [1, 2, 3],
98
                            'k3': {'k1': [(1, 2)]}},
99
                        'k2': (3, 4, 5),
100
                        'k3': {'k1': 1, 'k2': [1, 2, 3]}}),
101
                (tuple(), ('k1', ), ('k1', 'k2')),
102
                (0, 1, 2, 9), (False, True), (False, True)):
103
            d, exclude, indent, with_enumeration, recursive_enumeration = args
104
            with patch('kamaki.cli.utils.print_dict') as PD:
105
                with patch('kamaki.cli.utils.print_list') as PL:
106
                    pd_calls, pl_calls = 0, 0
107
                    print_dict(*args)
108
                    exp_calls = []
109
                    for i, (k, v) in enumerate(d.items()):
110
                        if k in exclude:
111
                            continue
112
                        str_k = ' ' * indent
113
                        str_k += '%s.' % (i + 1) if with_enumeration else ''
114
                        str_k += '%s:' % k
115
                        if isinstance(v, dict):
116
                            self.assertEqual(
117
                                PD.mock_calls[pd_calls],
118
                                call(
119
                                    v,
120
                                    exclude,
121
                                    indent + INDENT_TAB,
122
                                    recursive_enumeration,
123
                                    recursive_enumeration))
124
                            pd_calls += 1
125
                            exp_calls.append(call(str_k))
126
                        elif isinstance(v, list) or isinstance(v, tuple):
127
                            self.assertEqual(
128
                                PL.mock_calls[pl_calls],
129
                                call(
130
                                    v,
131
                                    exclude,
132
                                    indent + INDENT_TAB,
133
                                    recursive_enumeration,
134
                                    recursive_enumeration))
135
                            pl_calls += 1
136
                            exp_calls.append(call(str_k))
137
                        else:
138
                            exp_calls.append(call('%s %s' % (str_k, v)))
139
                    real_calls = PR.mock_calls[call_counter:]
140
                    call_counter = len(PR.mock_calls)
141
                    self.assertEqual(sorted(real_calls), sorted(exp_calls))
142

    
143
    @patch('kamaki.cli.utils._print')
144
    def test_print_list(self, PR):
145
        from kamaki.cli.utils import print_list, INDENT_TAB
146
        call_counter = 0
147
        self.assertRaises(AssertionError, print_list, 'non-list non-tuple')
148
        self.assertRaises(AssertionError, print_list, {}, indent=-10)
149
        for args in product(
150
                (
151
                    ['v1', ],
152
                    ('v2', 'v3'),
153
                    [1, '2', 'v3'],
154
                    ({'k1': 'v1'}, 2, 'v3'),
155
                    [(1, 2), 'v2', [(3, 4), {'k3': [5, 6], 'k4': 7}]]),
156
                (tuple(), ('v1', ), ('v1', 1), ('v1', 'k3')),
157
                (0, 1, 2, 9), (False, True), (False, True)):
158
            l, exclude, indent, with_enumeration, recursive_enumeration = args
159
            with patch('kamaki.cli.utils.print_dict') as PD:
160
                with patch('kamaki.cli.utils.print_list') as PL:
161
                    pd_calls, pl_calls = 0, 0
162
                    print_list(*args)
163
                    exp_calls = []
164
                    for i, v in enumerate(l):
165
                        str_v = ' ' * indent
166
                        str_v += '%s.' % (i + 1) if with_enumeration else ''
167
                        if isinstance(v, dict):
168
                            if with_enumeration:
169
                                exp_calls.append(call(str_v))
170
                            elif i and i < len(l):
171
                                exp_calls.append(call())
172
                            self.assertEqual(
173
                                PD.mock_calls[pd_calls],
174
                                call(
175
                                    v,
176
                                    exclude,
177
                                    indent + (
178
                                        INDENT_TAB if with_enumeration else 0),
179
                                    recursive_enumeration,
180
                                    recursive_enumeration))
181
                            pd_calls += 1
182
                        elif isinstance(v, list) or isinstance(v, tuple):
183
                            if with_enumeration:
184
                                exp_calls.append(call(str_v))
185
                            elif i and i < len(l):
186
                                exp_calls.append(call())
187
                            self.assertEqual(
188
                                PL.mock_calls[pl_calls],
189
                                call(
190
                                    v,
191
                                    exclude,
192
                                    indent + INDENT_TAB,
193
                                    recursive_enumeration,
194
                                    recursive_enumeration))
195
                            pl_calls += 1
196
                        elif ('%s' % v) in exclude:
197
                            continue
198
                        else:
199
                            exp_calls.append(call('%s%s' % (str_v, v)))
200
                    real_calls = PR.mock_calls[call_counter:]
201
                    call_counter = len(PR.mock_calls)
202
                    self.assertEqual(sorted(real_calls), sorted(exp_calls))
203

    
204
    @patch('__builtin__.raw_input')
205
    def test_page_hold(self, RI):
206
        from kamaki.cli.utils import page_hold
207
        ri_counter = 0
208
        for args, expected in (
209
                ((0, 0, 0), False),
210
                ((1, 3, 10), True),
211
                ((3, 3, 10), True),
212
                ((5, 3, 10), True),
213
                ((6, 3, 10), True),
214
                ((10, 3, 10), False),
215
                ((11, 3, 10), False)):
216
            self.assertEqual(page_hold(*args), expected)
217
            index, limit, maxlen = args
218
            if index and index < maxlen and index % limit == 0:
219
                self.assertEqual(ri_counter + 1, len(RI.mock_calls))
220
                self.assertEqual(RI.mock_calls[-1], call(
221
                    '(%s listed - %s more - "enter" to continue)' % (
222
                        index, maxlen - index)))
223
            else:
224
                self.assertEqual(ri_counter, len(RI.mock_calls))
225
            ri_counter = len(RI.mock_calls)
226

    
227
    @patch('kamaki.cli.utils._print')
228
    @patch('kamaki.cli.utils._write')
229
    @patch('kamaki.cli.utils.print_dict')
230
    @patch('kamaki.cli.utils.print_list')
231
    @patch('kamaki.cli.utils.page_hold')
232
    @patch('kamaki.cli.utils.bold', return_value='bold')
233
    def test_print_items(self, bold, PH, PL, PD, WR, PR):
234
        from kamaki.cli.utils import print_items, INDENT_TAB
235
        for args in product(
236
                (
237
                    42, None, 'simple outputs',
238
                    [1, 2, 3], {1: 1, 2: 2}, (3, 4),
239
                    ({'k': 1, 'id': 2}, [5, 6, 7], (8, 9), '10')),
240
                (('id', 'name'), ('something', 2), ('lala', )),
241
                (False, True),
242
                (False, True),
243
                (0, 1, 2, 10)):
244
            items, title, with_enumeration, with_redundancy, page_size = args
245
            wr_counter, pr_counter = len(WR.mock_calls), len(PR.mock_calls)
246
            pl_counter, pd_counter = len(PL.mock_calls), len(PD.mock_calls)
247
            bold_counter, ph_counter = len(bold.mock_calls), len(PH.mock_calls)
248
            print_items(*args)
249
            if not (isinstance(items, dict) or isinstance(
250
                    items, list) or isinstance(items, tuple)):
251
                if items:
252
                    self.assertEqual(PR.mock_calls[-1], call('%s' % items))
253
            else:
254
                for i, item in enumerate(items):
255
                    if with_enumeration:
256
                        self.assertEqual(
257
                            WR.mock_calls[wr_counter],
258
                            call('%s. ' % (i + 1)))
259
                        wr_counter += 1
260
                    if isinstance(item, dict):
261
                        title = sorted(set(title).intersection(item))
262
                        pick = item.get if with_redundancy else item.pop
263
                        header = ' '.join('%s' % pick(key) for key in title)
264
                        self.assertEqual(
265
                            bold.mock_calls[bold_counter], call(header))
266
                        self.assertEqual(
267
                            PR.mock_calls[pr_counter], call('bold'))
268
                        self.assertEqual(
269
                            PD.mock_calls[pd_counter],
270
                            call(item, indent=INDENT_TAB))
271
                        pr_counter += 1
272
                        pd_counter += 1
273
                        bold_counter += 1
274
                    elif isinstance(item, list) or isinstance(item, tuple):
275
                        self.assertEqual(
276
                            PL.mock_calls[pl_counter],
277
                            call(item, indent=INDENT_TAB))
278
                        pl_counter += 1
279
                    else:
280
                        self.assertEqual(
281
                            PR.mock_calls[pr_counter], call(' %s' % item))
282
                        pr_counter += 1
283
                    page_size = page_size if page_size > 0 else len(items)
284
                    self.assertEqual(
285
                        PH.mock_calls[ph_counter],
286
                        call(i + 1, page_size, len(items)))
287
                    ph_counter += 1
288

    
289
    def test_format_size(self):
290
        from kamaki.cli.utils import format_size
291
        from kamaki.cli import CLIError
292
        for v in ('wrong', {1: '1', 2: '2'}, ('tuples', 'not OK'), [1, 2]):
293
            self.assertRaises(CLIError, format_size, v)
294
        for step, B, K, M, G, T in (
295
                (1000, 'B', 'KB', 'MB', 'GB', 'TB'),
296
                (1024, 'B', 'KiB', 'MiB', 'GiB', 'TiB')):
297
            Ki, Mi, Gi = step, step * step, step * step * step
298
            for before, after in (
299
                    (0, '0' + B), (512, '512' + B), (
300
                        Ki - 1, '%s%s' % (step - 1, B)),
301
                    (Ki, '1' + K), (42 * Ki, '42' + K), (
302
                        Mi - 1, '%s.99%s' % (step - 1, K)),
303
                    (Mi, '1' + M), (42 * Mi, '42' + M), (
304
                        Ki * Mi - 1, '%s.99%s' % (step - 1, M)),
305
                    (Gi, '1' + G), (42 * Gi, '42' + G), (
306
                        Mi * Mi - 1, '%s.99%s' % (step - 1, G)),
307
                    (Mi * Mi, '1' + T), (42 * Mi * Mi, '42' + T), (
308
                        Mi * Gi - 1, '%s.99%s' % (step - 1, T)), (
309
                        42 * Mi * Gi, '%s%s' % (42 * Ki, T))):
310
                self.assertEqual(format_size(before, step == 1000), after)
311

    
312
    def test_to_bytes(self):
313
        from kamaki.cli.utils import to_bytes
314
        for v in ('wrong', 'KABUM', 'kbps', 'kibps'):
315
            self.assertRaises(ValueError, to_bytes, v, 'B')
316
            self.assertRaises(ValueError, to_bytes, 42, v)
317
        for v in ([1, 2, 3], ('kb', 'mb'), {'kb': 1, 'byte': 2}):
318
            self.assertRaises(TypeError, to_bytes, v, 'B')
319
            self.assertRaises(AttributeError, to_bytes, 42, v)
320
        kl, ki = 1000, 1024
321
        for size, (unit, factor) in product(
322
                (0, 42, 3.14, 1023, 10000),
323
                (
324
                    ('B', 1), ('b', 1),
325
                    ('KB', kl), ('KiB', ki),
326
                    ('mb', kl * kl), ('mIb', ki * ki),
327
                    ('gB', kl * kl * kl), ('GIB', ki * ki * ki),
328
                    ('TB', kl * kl * kl * kl), ('tiB', ki * ki * ki * ki))):
329
            self.assertEqual(to_bytes(size, unit), int(size * factor))
330

    
331
    def test_dict2file(self):
332
        from kamaki.cli.utils import dict2file, INDENT_TAB
333
        for d, depth in product((
334
                    {'k': 42},
335
                    {'k1': 'v1', 'k2': [1, 2, 3], 'k3': {'k': 'v'}},
336
                    {'k1': {
337
                        'k1.1': 'v1.1',
338
                        'k1.2': [1, 2, 3],
339
                        'k1.3': {'k': 'v'}}}),
340
                (-42, 0, 42)):
341
            exp = ''
342
            exp_d = []
343
            exp_l = []
344
            exp, exp_d, exp_l = '', [], []
345
            with NamedTemporaryFile() as f:
346
                for k, v in d.items():
347
                    sfx = '\n'
348
                    if isinstance(v, dict):
349
                        exp_d.append(call(v, f, depth + 1))
350
                    elif isinstance(v, tuple) or isinstance(v, list):
351
                        exp_l.append(call(v, f, depth + 1))
352
                    else:
353
                        sfx = '%s\n' % v
354
                    exp += '%s%s: %s' % (
355
                        ' ' * (depth * INDENT_TAB), k, sfx)
356
                with patch('kamaki.cli.utils.dict2file') as D2F:
357
                    with patch('kamaki.cli.utils.list2file') as L2F:
358
                        dict2file(d, f, depth)
359
                        f.seek(0)
360
                        self.assertEqual(f.read(), exp)
361
                        self.assertEqual(L2F.mock_calls, exp_l)
362
                        self.assertEqual(D2F.mock_calls, exp_d)
363

    
364
    def test_list2file(self):
365
        from kamaki.cli.utils import list2file, INDENT_TAB
366
        for l, depth in product(
367
                (
368
                    (1, 2, 3),
369
                    [1, 2, 3],
370
                    ('v', [1, 2, 3], (1, 2, 3), {'1': 1, 2: '2', 3: 3}),
371
                    ['v', {'k1': 'v1', 'k2': [1, 2, 3], 'k3': {1: '1'}}]),
372
                (-42, 0, 42)):
373
            with NamedTemporaryFile() as f:
374
                exp, exp_d, exp_l = '', [], []
375
                for v in l:
376
                    if isinstance(v, dict):
377
                        exp_d.append(call(v, f, depth + 1))
378
                    elif isinstance(v, list) or isinstance(v, tuple):
379
                        exp_l.append(call(v, f, depth + 1))
380
                    else:
381
                        exp += '%s%s\n' % (' ' * INDENT_TAB * depth, v)
382
                with patch('kamaki.cli.utils.dict2file') as D2F:
383
                    with patch('kamaki.cli.utils.list2file') as L2F:
384
                        list2file(l, f, depth)
385
                        f.seek(0)
386
                        self.assertEqual(f.read(), exp)
387
                        self.assertEqual(L2F.mock_calls, exp_l)
388
                        self.assertEqual(D2F.mock_calls, exp_d)
389

    
390
    def test__parse_with_regex(self):
391
        from re import compile as r_compile
392
        from kamaki.cli.utils import _parse_with_regex
393
        for args in product(
394
                (
395
                    'this is a line',
396
                    'this_is_also_a_line',
397
                    'This "text" is quoted',
398
                    'This "quoted" "text" is more "complicated"',
399
                    'Is this \'quoted\' text "double \'quoted\' or not?"',
400
                    '"What \'about\' the" oposite?',
401
                    ' Try with a " single double quote',
402
                    'Go "down \'deep " deeper \'bottom \' up" go\' up" !'),
403
                (
404
                    '\'.*?\'|".*?"|^[\S]*$',
405
                    r'"([A-Za-z0-9_\./\\-]*)"',
406
                    r'\"(.+?)\"',
407
                    '\\^a\\.\\*\\$')):
408
            r_parser = r_compile(args[1])
409
            self.assertEqual(
410
                _parse_with_regex(*args),
411
                (r_parser.split(args[0]), r_parser.findall(args[0])))
412

    
413
    def test_split_input(self):
414
        from kamaki.cli.utils import split_input
415
        for line, expected in (
416
                ('unparsable', ['unparsable']),
417
                ('"parsable"', ['parsable']),
418
                ('"parse" out', ['parse', 'out']),
419
                ('"one', ['"one']),
420
                ('two" or" more"', ['two', ' or', 'more"']),
421
                ('Go "down \'deep " deeper \'bottom \' up" go\' up" !', [
422
                    'Go', "down 'deep ", 'deeper', 'bottom ',
423
                    'up', " go' up", '!']),
424
                ('Is "this" a \'parsed\' string?', [
425
                    'Is', 'this', 'a', 'parsed', 'string?'])):
426
            self.assertEqual(split_input(line), expected)
427

    
428
    @patch('kamaki.cli.utils._readline', return_value='read line')
429
    @patch('kamaki.cli.utils._flush')
430
    @patch('kamaki.cli.utils._write')
431
    def test_ask_user(self, WR, FL, RL):
432
        from kamaki.cli.utils import ask_user
433
        msg = 'some question'
434
        self.assertFalse(ask_user(msg))
435
        WR.assert_called_once_with('%s [y/N]: ' % msg)
436
        FL.assert_called_once_with()
437
        RL.assert_called_once_with()
438

    
439
        self.assertTrue(ask_user(msg, ('r', )))
440
        self.assertEqual(WR.mock_calls[-1], call('%s [r/N]: ' % msg))
441
        self.assertEqual(FL.mock_calls, 2 * [call()])
442
        self.assertEqual(RL.mock_calls, 2 * [call()])
443

    
444
        self.assertTrue(ask_user(msg, ('Z', 'r', 'k')))
445
        self.assertEqual(WR.mock_calls[-1], call('%s [Z, r, k/N]: ' % msg))
446
        self.assertEqual(FL.mock_calls, 3 * [call()])
447
        self.assertEqual(RL.mock_calls, 3 * [call()])
448

    
449
    @patch('kamaki.cli.utils._flush')
450
    @patch('kamaki.cli.utils._write')
451
    def test_spiner(self, WR, FL):
452
        from kamaki.cli.utils import spiner
453
        spins = ('/', '-', '\\', '|')
454
        prev = 1
455
        for i, SP in enumerate(spiner(6)):
456
            if not i:
457
                self.assertEqual(WR.mock_calls[-2], call(' '))
458
            elif i > 5:
459
                break
460
            self.assertEqual(SP, None)
461
            self.assertEqual(WR.mock_calls[-1], call('\b%s' % spins[i % 4]))
462
            self.assertEqual(FL.mock_calls, prev * [call()])
463
            prev += 1
464

    
465
    def test_remove_from_items(self):
466
        from kamaki.cli.utils import remove_from_items
467
        for v in ('wrong', [1, 2, 3], [{}, 2, {}]):
468
            self.assertRaises(AssertionError, remove_from_items, v, 'none')
469
        d = dict(k1=1, k2=dict(k2=2, k3=3), k3=3, k4=4)
470
        for k in (d.keys() + ['kN']):
471
            tmp1, tmp2 = dict(d), dict(d)
472
            remove_from_items([tmp1, ], k)
473
            tmp1.pop(k, None)
474
            self.assert_dicts_are_equal(tmp1, tmp2)
475
        for k in (d.keys() + ['kN']):
476
            tmp1, tmp2 = dict(d), dict(d)
477
            remove_from_items([tmp1, tmp2], k)
478
            self.assert_dicts_are_equal(tmp1, tmp2)
479

    
480
    def test_filter_dicts_by_dict(self):
481
        from kamaki.cli.utils import filter_dicts_by_dict
482

    
483
        dlist = [
484
            dict(k1='v1', k2='v2', k3='v3'),
485
            dict(k1='v1'),
486
            dict(k2='v2', k3='v3'),
487
            dict(k1='V1', k3='V3'),
488
            dict()]
489
        for l, f, em, cs, exp in (
490
                (dlist, dlist[2], True, False, dlist[0:1] + dlist[2:3]),
491
                (dlist, dlist[1], True, False, dlist[0:2] + dlist[3:4]),
492
                (dlist, dlist[1], True, True, dlist[0:2]),
493
                (dlist, {'k3': 'v'}, True, False, []),
494
                (dlist, {'k3': 'v'}, False, False, dlist[0:1] + dlist[2:4]),
495
                (dlist, {'k3': 'v'}, False, True, dlist[0:1] + dlist[2:3]),
496
                (dlist, {'k3': 'v'}, True, True, []),
497
                (dlist, dlist[4], True, False, dlist),
498
                ):
499
            self.assertEqual(exp, filter_dicts_by_dict(l, f, em, cs))
500

    
501

    
502
if __name__ == '__main__':
503
    from sys import argv
504
    from kamaki.cli.test import runTestCase
505
    runTestCase(UtilsMethods, 'UtilsMethods', argv[1:])