Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / test.py @ cdeadadc

History | View | Annotate | Download (16.1 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from unittest import makeSuite, TestSuite, TextTestRunner, TestCase
35
from inspect import getmembers, isclass
36
from tempfile import NamedTemporaryFile
37
from mock import patch, call
38
from itertools import product
39

    
40
from kamaki.cli.command_tree.test import Command, CommandTree
41
from kamaki.cli.argument.test import (
42
    Argument, ConfigArgument, RuntimeConfigArgument, FlagArgument,
43
    ValueArgument, IntArgument, DateArgument, VersionArgument,
44
    KeyValueArgument, ProgressBarArgument, ArgumentParseManager)
45

    
46

    
47
class History(TestCase):
48

    
49
    def setUp(self):
50
        from kamaki.cli.history import History as HClass
51
        self.HCLASS = HClass
52
        self.file = NamedTemporaryFile()
53

    
54
    def tearDown(self):
55
        self.file.close()
56

    
57
    def test__match(self):
58
        self.assertRaises(AttributeError, self.HCLASS._match, 'ok', 42)
59
        self.assertRaises(TypeError, self.HCLASS._match, 2.71, 'ok')
60
        for args, expected in (
61
                (('XXX', None), True),
62
                ((None, None), True),
63
                (('this line has some terms', 'some terms'), True),
64
                (('this line has some terms', 'some bad terms'), False),
65
                (('small line', 'not so small line terms'), False),
66
                ((['line', 'with', 'some', 'terms'], 'some terms'), True),
67
                ((['line', 'with', 'some terms'], 'some terms'), False)):
68
            self.assertEqual(self.HCLASS._match(*args), expected)
69

    
70
    def test_get(self):
71
        history = self.HCLASS(self.file.name)
72
        self.assertEqual(history.get(), [])
73

    
74
        sample_history = (
75
            'kamaki history show\n',
76
            'kamaki file list\n',
77
            'kamaki touch pithos:f1\n',
78
            'kamaki file info pithos:f1\n')
79
        self.file.write(''.join(sample_history))
80
        self.file.flush()
81

    
82
        expected = ['%s.  \t%s' % (
83
            i + 1, event) for i, event in enumerate(sample_history)]
84
        self.assertEqual(history.get(), expected)
85
        self.assertEqual(history.get('kamaki'), expected)
86
        self.assertEqual(history.get('file kamaki'), expected[1::2])
87
        self.assertEqual(history.get('pithos:f1'), expected[2:])
88
        self.assertEqual(history.get('touch pithos:f1'), expected[2:3])
89

    
90
        for limit in range(len(sample_history)):
91
            self.assertEqual(history.get(limit=limit), expected[-limit:])
92
            self.assertEqual(
93
                history.get('kamaki', limit=limit), expected[-limit:])
94

    
95
    def test_add(self):
96
        history = self.HCLASS(self.file.name)
97
        some_strings = ('a brick', 'two bricks', 'another brick', 'A wall!')
98
        for i, line in enumerate(some_strings):
99
            history.add(line)
100
            self.file.seek(0)
101
            self.assertEqual(
102
                self.file.read(), '\n'.join(some_strings[:(i + 1)]) + '\n')
103

    
104
    def test_clean(self):
105
        content = 'a brick\ntwo bricks\nanother brick\nA wall!\n'
106
        self.file.write(content)
107
        self.file.flush()
108
        self.file.seek(0)
109
        self.assertEqual(self.file.read(), content)
110
        history = self.HCLASS(self.file.name)
111
        history.clean()
112
        self.file.seek(0)
113
        self.assertEqual(self.file.read(), '')
114

    
115
    def test_retrieve(self):
116
        sample_history = (
117
            'kamaki history show\n',
118
            'kamaki file list\n',
119
            'kamaki touch pithos:f1\n',
120
            'kamaki file info pithos:f1\n',
121
            'current / last command is always excluded')
122
        self.file.write(''.join(sample_history))
123
        self.file.flush()
124

    
125
        history = self.HCLASS(self.file.name)
126
        self.assertRaises(ValueError, history.retrieve, 'must be number')
127
        self.assertRaises(TypeError, history.retrieve, [1, 2, 3])
128

    
129
        for i in (0, len(sample_history), -len(sample_history)):
130
            self.assertEqual(history.retrieve(i), None)
131
        for i in range(1, len(sample_history)):
132
            self.assertEqual(history.retrieve(i), sample_history[i - 1])
133
            self.assertEqual(history.retrieve(- i), sample_history[- i - 1])
134

    
135

    
136
class LoggerMethods(TestCase):
137

    
138
    class PseudoLogger(object):
139
        level = 'some level'
140
        _setLevel_calls = []
141
        _addHandler_calls = []
142

    
143
        def setLevel(self, *args):
144
            self._setLevel_calls.append(args)
145

    
146
        def addHandler(self, *args):
147
            self._addHandler_calls.append(args)
148

    
149
    class PseudoHandler(object):
150
        _setFormatter_calls = []
151

    
152
        def setFormatter(self, *args):
153
            self._setFormatter_calls.append(args)
154

    
155
    def setUp(self):
156
        from kamaki.cli.logger import LOG_FILE, _blacklist
157
        self.LF, self.BL = list(LOG_FILE), dict(_blacklist)
158

    
159
    def tearDown(self):
160
        self.PseudoLogger._setLevel_calls = []
161
        self.PseudoLogger._addHandler_calls = []
162
        self.PseudoLogger._setFormatter_calls = []
163
        from kamaki.cli.logger import LOG_FILE, _blacklist
164
        for e in LOG_FILE:
165
            LOG_FILE.pop()
166
        for e in self.LF:
167
            LOG_FILE.append(e)
168
        _blacklist.clear()
169
        _blacklist.update(self.BL)
170

    
171
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
172
    def test_deactivate(self, GL):
173
        from kamaki.cli.logger import deactivate, _blacklist
174
        self.assertEqual(_blacklist, {})
175
        deactivate('some logger')
176
        GL.assert_called_once_with('some logger')
177
        self.assertEqual(
178
            _blacklist.get('some logger', None), self.PseudoLogger.level)
179
        from logging import CRITICAL
180
        self.assertEqual(self.PseudoLogger._setLevel_calls[-1], (CRITICAL, ))
181

    
182
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
183
    def test_activate(self, GL):
184
        from kamaki.cli.logger import activate
185
        activate('another logger')
186
        GL.assert_called_once_with('another logger')
187
        self.assertEqual(
188
            self.PseudoLogger._setLevel_calls[-1], (self.PseudoLogger.level, ))
189

    
190
    def test_get_log_filename(self):
191
        from kamaki.cli.logger import get_log_filename, LOG_FILE
192
        f = NamedTemporaryFile()
193
        for e in LOG_FILE:
194
            LOG_FILE.pop()
195
        LOG_FILE.append(f.name)
196
        self.assertEqual(get_log_filename(), f.name)
197
        LOG_FILE.pop()
198
        LOG_FILE.append(2 * f.name)
199
        print('\n  Should print error msg here: ')
200
        self.assertEqual(get_log_filename(), None)
201

    
202
    def test_set_log_filename(self):
203
        from kamaki.cli.logger import set_log_filename, LOG_FILE
204
        for n in ('some name', 'some other name'):
205
            set_log_filename(n)
206
            self.assertEqual(LOG_FILE[0], n)
207

    
208
    @patch('kamaki.cli.logger.get_logger', return_value=PseudoLogger())
209
    @patch('kamaki.cli.logger.logging.Formatter', return_value='f0rm4t')
210
    @patch(
211
        'kamaki.cli.logger.logging.StreamHandler',
212
        return_value=PseudoHandler())
213
    @patch(
214
        'kamaki.cli.logger.logging.FileHandler',
215
        return_value=PseudoHandler())
216
    def test__add_logger(self, FH, SH, F, GL):
217
        from kamaki.cli.logger import _add_logger
218
        from logging import DEBUG
219
        stdf, cnt = '%(name)s\n %(message)s', 0
220
        for name, level, filename, fmt in product(
221
                ('my logger', ),
222
                ('my level', None),
223
                ('my filename', None),
224
                ('my fmt', None)):
225
            log = _add_logger(name, level, filename, fmt)
226
            self.assertTrue(isinstance(log, self.PseudoLogger))
227
            self.assertEqual(GL.mock_calls[-1], call(name))
228
            if filename:
229
                self.assertEqual(FH.mock_calls[-1], call(filename))
230
            else:
231
                self.assertEqual(SH.mock_calls[-1], call())
232
            self.assertEqual(F.mock_calls[-1], call(fmt or stdf))
233
            self.assertEqual(
234
                self.PseudoHandler._setFormatter_calls[-1], ('f0rm4t', ))
235
            cnt += 1
236
            self.assertEqual(len(self.PseudoLogger._addHandler_calls), cnt)
237
            h = self.PseudoLogger._addHandler_calls[-1]
238
            self.assertTrue(isinstance(h[0], self.PseudoHandler))
239
            l = self.PseudoLogger._setLevel_calls[-1]
240
            self.assertEqual(l, (level or DEBUG, ))
241

    
242
    @patch('kamaki.cli.logger.get_log_filename', return_value='my log fname')
243
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
244
    def test_add_file_logger(self, GL, GLF):
245
        from kamaki.cli.logger import add_file_logger
246
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
247
            GLFcount = GLF.call_count
248
            for name, level, filename in product(
249
                    ('my name'), ('my level', None), ('my filename', None)):
250
                self.assertEqual(add_file_logger(name, level, filename), 'AL')
251
                self.assertEqual(AL.mock_calls[-1], call(
252
                    name, level, filename or 'my log fname',
253
                    fmt='%(name)s(%(levelname)s) %(asctime)s\n\t%(message)s'))
254
                if filename:
255
                    self.assertEqual(GLFcount, GLF.call_count)
256
                else:
257
                    GLFcount = GLF.call_count
258
                    self.assertEqual(GLF.mock_calls[-1], call())
259
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
260
            self.assertEqual(add_file_logger('X'), 'my get logger ret')
261
            GL.assert_called_once_with('X')
262

    
263
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
264
    def test_add_stream_logger(self, GL):
265
        from kamaki.cli.logger import add_stream_logger
266
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
267
            for name, level, fmt in product(
268
                    ('my name'), ('my level', None), ('my fmt', None)):
269
                self.assertEqual(add_stream_logger(name, level, fmt), 'AL')
270
                self.assertEqual(AL.mock_calls[-1], call(name, level, fmt=fmt))
271
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
272
            self.assertEqual(add_stream_logger('X'), 'my get logger ret')
273
            GL.assert_called_once_with('X')
274

    
275
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
276
    def test_get_logger(self, GL):
277
        from kamaki.cli.logger import get_logger
278
        get_logger('my logger name')
279
        GL.assert_called_once_with('my logger name')
280

    
281

    
282
class UtilsMethods(TestCase):
283

    
284
    def assert_dicts_are_equal(self, d1, d2):
285
        for k, v in d1.items():
286
            self.assertTrue(k in d2)
287
            if isinstance(v, dict):
288
                self.assert_dicts_are_equal(v, d2[k])
289
            else:
290
                self.assertEqual(unicode(v), unicode(d2[k]))
291

    
292
    def test_guess_mime_type(self):
293
        from kamaki.cli.utils import guess_mime_type
294
        from mimetypes import guess_type
295
        for args in product(
296
                ('file.txt', 'file.png', 'file.zip', 'file.gz', None, 'X'),
297
                ('a type', None),
298
                ('an/encoding', None)):
299
            filename, ctype, cencoding = args
300
            if filename:
301
                exp_type, exp_enc = guess_type(filename)
302
                self.assertEqual(
303
                    guess_mime_type(*args),
304
                    (exp_type or ctype, exp_enc or cencoding))
305
            else:
306
                self.assertRaises(AssertionError, guess_mime_type, *args)
307

    
308
    def test_pretty_keys(self):
309
        from kamaki.cli.utils import pretty_keys
310
        for args, exp in (
311
                (
312
                    ({'k1': 'v1', 'k1_k2': 'v2'}, ),
313
                    {'k1': 'v1', 'k1 k2': 'v2'}),
314
                (
315
                    ({'k1': 'v1', 'k1_k2': 'v2'}, '1'),
316
                    {'k': 'v1', 'k _k2': 'v2'}),
317
                (
318
                    ({'k1_k2': 'v1', 'k1': {'k2': 'v2', 'k2_k3': 'v3'}}, ),
319
                    {'k1 k2': 'v1', 'k1': {'k2': 'v2', 'k2_k3': 'v3'}}),
320
                (
321
                    (
322
                        {'k1_k2': 'v1', 'k1': {'k2': 'v2', 'k2_k3': 'v3'}},
323
                        '_',
324
                        True),
325
                    {'k1 k2': 'v1', 'k1': {'k2': 'v2', 'k2 k3': 'v3'}}),
326
                (
327
                    (
328
                        {
329
                            'k1_k2': {'k_1': 'v_1', 'k_2': {'k_3': 'v_3'}},
330
                            'k1': {'k2': 'v2', 'k2_k3': 'v3'}},
331
                        '_',
332
                        True),
333
                    {
334
                        'k1 k2': {'k 1': 'v_1', 'k 2': {'k 3': 'v_3'}},
335
                        'k1': {'k2': 'v2', 'k2 k3': 'v3'}}),
336
                (
337
                    (
338
                        {
339
                            'k1_k2': {'k_1': 'v_1', 'k_2': {'k_3': 'v_3'}},
340
                            'k1': {'k2': 'v2', 'k2_k3': 'v3'}},
341
                        '1',
342
                        True),
343
                    {
344
                        'k _k2': {'k_': 'v_1', 'k_2': {'k_3': 'v_3'}},
345
                        'k': {'k2': 'v2', 'k2_k3': 'v3'}})
346
            ):
347
            initial = dict(args[0])
348
            self.assert_dicts_are_equal(pretty_keys(*args), exp)
349
            self.assert_dicts_are_equal(initial, args[0])
350

    
351

    
352
#  TestCase auxiliary methods
353

    
354
def runTestCase(cls, test_name, args=[], failure_collector=[]):
355
    """
356
    :param cls: (TestCase) a set of Tests
357

358
    :param test_name: (str)
359

360
    :param args: (list) these are prefixed with test_ and used as params when
361
        instantiating cls
362

363
    :param failure_collector: (list) collects info of test failures
364

365
    :returns: (int) total # of run tests
366
    """
367
    suite = TestSuite()
368
    if args:
369
        suite.addTest(cls('_'.join(['test'] + args)))
370
    else:
371
        suite.addTest(makeSuite(cls))
372
    print('* Test * %s *' % test_name)
373
    r = TextTestRunner(verbosity=2).run(suite)
374
    failure_collector += r.failures
375
    return r.testsRun
376

    
377

    
378
def get_test_classes(module=__import__(__name__), name=''):
379
    module_stack = [module]
380
    while module_stack:
381
        module = module_stack[-1]
382
        module_stack = module_stack[:-1]
383
        for objname, obj in getmembers(module):
384
            if (objname == name or not name):
385
                if isclass(obj) and objname != 'TestCase' and (
386
                        issubclass(obj, TestCase)):
387
                    yield (obj, objname)
388

    
389

    
390
def main(argv):
391
    found = False
392
    failure_collector = list()
393
    num_of_tests = 0
394
    for cls, name in get_test_classes(name=argv[1] if len(argv) > 1 else ''):
395
        found = True
396
        num_of_tests += runTestCase(cls, name, argv[2:], failure_collector)
397
    if not found:
398
        print('Test "%s" not found' % ' '.join(argv[1:]))
399
    else:
400
        for i, failure in enumerate(failure_collector):
401
            print('Failure %s: ' % (i + 1))
402
            for field in failure:
403
                print('\t%s' % field)
404
        print('\nTotal tests run: %s' % num_of_tests)
405
        print('Total failures: %s' % len(failure_collector))
406

    
407

    
408
if __name__ == '__main__':
409
    from sys import argv
410
    main(argv)