Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / test.py @ b3cb58c2

History | View | Annotate | Download (13.4 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from unittest import makeSuite, TestSuite, TextTestRunner, TestCase
35
from inspect import getmembers, isclass
36
from tempfile import NamedTemporaryFile
37
from mock import patch, call
38
from itertools import product
39

    
40
from kamaki.cli.command_tree.test import Command, CommandTree
41
from kamaki.cli.argument.test import (
42
    Argument, ConfigArgument, RuntimeConfigArgument, FlagArgument,
43
    ValueArgument, IntArgument, DateArgument, VersionArgument,
44
    KeyValueArgument, ProgressBarArgument, ArgumentParseManager)
45
from kamaki.cli.utils.test import UtilsMethods
46

    
47

    
48
class History(TestCase):
49

    
50
    def setUp(self):
51
        from kamaki.cli.history import History as HClass
52
        self.HCLASS = HClass
53
        self.file = NamedTemporaryFile()
54

    
55
    def tearDown(self):
56
        self.file.close()
57

    
58
    def test__match(self):
59
        self.assertRaises(AttributeError, self.HCLASS._match, 'ok', 42)
60
        self.assertRaises(TypeError, self.HCLASS._match, 2.71, 'ok')
61
        for args, expected in (
62
                (('XXX', None), True),
63
                ((None, None), True),
64
                (('this line has some terms', 'some terms'), True),
65
                (('this line has some terms', 'some bad terms'), False),
66
                (('small line', 'not so small line terms'), False),
67
                ((['line', 'with', 'some', 'terms'], 'some terms'), True),
68
                ((['line', 'with', 'some terms'], 'some terms'), False)):
69
            self.assertEqual(self.HCLASS._match(*args), expected)
70

    
71
    def test_get(self):
72
        history = self.HCLASS(self.file.name)
73
        self.assertEqual(history.get(), [])
74

    
75
        sample_history = (
76
            'kamaki history show\n',
77
            'kamaki file list\n',
78
            'kamaki touch pithos:f1\n',
79
            'kamaki file info pithos:f1\n')
80
        self.file.write(''.join(sample_history))
81
        self.file.flush()
82

    
83
        expected = ['%s.  \t%s' % (
84
            i + 1, event) for i, event in enumerate(sample_history)]
85
        self.assertEqual(history.get(), expected)
86
        self.assertEqual(history.get('kamaki'), expected)
87
        self.assertEqual(history.get('file kamaki'), expected[1::2])
88
        self.assertEqual(history.get('pithos:f1'), expected[2:])
89
        self.assertEqual(history.get('touch pithos:f1'), expected[2:3])
90

    
91
        for limit in range(len(sample_history)):
92
            self.assertEqual(history.get(limit=limit), expected[-limit:])
93
            self.assertEqual(
94
                history.get('kamaki', limit=limit), expected[-limit:])
95

    
96
    def test_add(self):
97
        history = self.HCLASS(self.file.name)
98
        some_strings = ('a brick', 'two bricks', 'another brick', 'A wall!')
99
        for i, line in enumerate(some_strings):
100
            history.add(line)
101
            self.file.seek(0)
102
            self.assertEqual(
103
                self.file.read(), '\n'.join(some_strings[:(i + 1)]) + '\n')
104

    
105
    def test_clean(self):
106
        content = 'a brick\ntwo bricks\nanother brick\nA wall!\n'
107
        self.file.write(content)
108
        self.file.flush()
109
        self.file.seek(0)
110
        self.assertEqual(self.file.read(), content)
111
        history = self.HCLASS(self.file.name)
112
        history.clean()
113
        self.file.seek(0)
114
        self.assertEqual(self.file.read(), '')
115

    
116
    def test_retrieve(self):
117
        sample_history = (
118
            'kamaki history show\n',
119
            'kamaki file list\n',
120
            'kamaki touch pithos:f1\n',
121
            'kamaki file info pithos:f1\n',
122
            'current / last command is always excluded')
123
        self.file.write(''.join(sample_history))
124
        self.file.flush()
125

    
126
        history = self.HCLASS(self.file.name)
127
        self.assertRaises(ValueError, history.retrieve, 'must be number')
128
        self.assertRaises(TypeError, history.retrieve, [1, 2, 3])
129

    
130
        for i in (0, len(sample_history), -len(sample_history)):
131
            self.assertEqual(history.retrieve(i), None)
132
        for i in range(1, len(sample_history)):
133
            self.assertEqual(history.retrieve(i), sample_history[i - 1])
134
            self.assertEqual(history.retrieve(- i), sample_history[- i - 1])
135

    
136

    
137
class LoggerMethods(TestCase):
138

    
139
    class PseudoLogger(object):
140
        level = 'some level'
141
        _setLevel_calls = []
142
        _addHandler_calls = []
143

    
144
        def setLevel(self, *args):
145
            self._setLevel_calls.append(args)
146

    
147
        def addHandler(self, *args):
148
            self._addHandler_calls.append(args)
149

    
150
    class PseudoHandler(object):
151
        _setFormatter_calls = []
152

    
153
        def setFormatter(self, *args):
154
            self._setFormatter_calls.append(args)
155

    
156
    def setUp(self):
157
        from kamaki.cli.logger import LOG_FILE, _blacklist
158
        self.LF, self.BL = list(LOG_FILE), dict(_blacklist)
159

    
160
    def tearDown(self):
161
        self.PseudoLogger._setLevel_calls = []
162
        self.PseudoLogger._addHandler_calls = []
163
        self.PseudoLogger._setFormatter_calls = []
164
        from kamaki.cli.logger import LOG_FILE, _blacklist
165
        for e in LOG_FILE:
166
            LOG_FILE.pop()
167
        for e in self.LF:
168
            LOG_FILE.append(e)
169
        _blacklist.clear()
170
        _blacklist.update(self.BL)
171

    
172
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
173
    def test_deactivate(self, GL):
174
        from kamaki.cli.logger import deactivate, _blacklist
175
        self.assertEqual(_blacklist, {})
176
        deactivate('some logger')
177
        GL.assert_called_once_with('some logger')
178
        self.assertEqual(
179
            _blacklist.get('some logger', None), self.PseudoLogger.level)
180
        from logging import CRITICAL
181
        self.assertEqual(self.PseudoLogger._setLevel_calls[-1], (CRITICAL, ))
182

    
183
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
184
    def test_activate(self, GL):
185
        from kamaki.cli.logger import activate
186
        activate('another logger')
187
        GL.assert_called_once_with('another logger')
188
        self.assertEqual(
189
            self.PseudoLogger._setLevel_calls[-1], (self.PseudoLogger.level, ))
190

    
191
    def test_get_log_filename(self):
192
        from kamaki.cli.logger import get_log_filename, LOG_FILE
193
        f = NamedTemporaryFile()
194
        for e in LOG_FILE:
195
            LOG_FILE.pop()
196
        LOG_FILE.append(f.name)
197
        self.assertEqual(get_log_filename(), f.name)
198
        LOG_FILE.pop()
199
        LOG_FILE.append(2 * f.name)
200
        print('\n  Should print error msg here: ')
201
        self.assertEqual(get_log_filename(), None)
202

    
203
    def test_set_log_filename(self):
204
        from kamaki.cli.logger import set_log_filename, LOG_FILE
205
        for n in ('some name', 'some other name'):
206
            set_log_filename(n)
207
            self.assertEqual(LOG_FILE[0], n)
208

    
209
    @patch('kamaki.cli.logger.get_logger', return_value=PseudoLogger())
210
    @patch('kamaki.cli.logger.logging.Formatter', return_value='f0rm4t')
211
    @patch(
212
        'kamaki.cli.logger.logging.StreamHandler',
213
        return_value=PseudoHandler())
214
    @patch(
215
        'kamaki.cli.logger.logging.FileHandler',
216
        return_value=PseudoHandler())
217
    def test__add_logger(self, FH, SH, F, GL):
218
        from kamaki.cli.logger import _add_logger
219
        from logging import DEBUG
220
        stdf, cnt = '%(name)s\n %(message)s', 0
221
        for name, level, filename, fmt in product(
222
                ('my logger', ),
223
                ('my level', None),
224
                ('my filename', None),
225
                ('my fmt', None)):
226
            log = _add_logger(name, level, filename, fmt)
227
            self.assertTrue(isinstance(log, self.PseudoLogger))
228
            self.assertEqual(GL.mock_calls[-1], call(name))
229
            if filename:
230
                self.assertEqual(FH.mock_calls[-1], call(filename))
231
            else:
232
                self.assertEqual(SH.mock_calls[-1], call())
233
            self.assertEqual(F.mock_calls[-1], call(fmt or stdf))
234
            self.assertEqual(
235
                self.PseudoHandler._setFormatter_calls[-1], ('f0rm4t', ))
236
            cnt += 1
237
            self.assertEqual(len(self.PseudoLogger._addHandler_calls), cnt)
238
            h = self.PseudoLogger._addHandler_calls[-1]
239
            self.assertTrue(isinstance(h[0], self.PseudoHandler))
240
            l = self.PseudoLogger._setLevel_calls[-1]
241
            self.assertEqual(l, (level or DEBUG, ))
242

    
243
    @patch('kamaki.cli.logger.get_log_filename', return_value='my log fname')
244
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
245
    def test_add_file_logger(self, GL, GLF):
246
        from kamaki.cli.logger import add_file_logger
247
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
248
            GLFcount = GLF.call_count
249
            for name, level, filename in product(
250
                    ('my name'), ('my level', None), ('my filename', None)):
251
                self.assertEqual(add_file_logger(name, level, filename), 'AL')
252
                self.assertEqual(AL.mock_calls[-1], call(
253
                    name, level, filename or 'my log fname',
254
                    fmt='%(name)s(%(levelname)s) %(asctime)s\n\t%(message)s'))
255
                if filename:
256
                    self.assertEqual(GLFcount, GLF.call_count)
257
                else:
258
                    GLFcount = GLF.call_count
259
                    self.assertEqual(GLF.mock_calls[-1], call())
260
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
261
            self.assertEqual(add_file_logger('X'), 'my get logger ret')
262
            GL.assert_called_once_with('X')
263

    
264
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
265
    def test_add_stream_logger(self, GL):
266
        from kamaki.cli.logger import add_stream_logger
267
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
268
            for name, level, fmt in product(
269
                    ('my name'), ('my level', None), ('my fmt', None)):
270
                self.assertEqual(add_stream_logger(name, level, fmt), 'AL')
271
                self.assertEqual(AL.mock_calls[-1], call(name, level, fmt=fmt))
272
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
273
            self.assertEqual(add_stream_logger('X'), 'my get logger ret')
274
            GL.assert_called_once_with('X')
275

    
276
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
277
    def test_get_logger(self, GL):
278
        from kamaki.cli.logger import get_logger
279
        get_logger('my logger name')
280
        GL.assert_called_once_with('my logger name')
281

    
282

    
283
#  TestCase auxiliary methods
284

    
285
def runTestCase(cls, test_name, args=[], failure_collector=[]):
286
    """
287
    :param cls: (TestCase) a set of Tests
288

289
    :param test_name: (str)
290

291
    :param args: (list) these are prefixed with test_ and used as params when
292
        instantiating cls
293

294
    :param failure_collector: (list) collects info of test failures
295

296
    :returns: (int) total # of run tests
297
    """
298
    suite = TestSuite()
299
    if args:
300
        suite.addTest(cls('_'.join(['test'] + args)))
301
    else:
302
        suite.addTest(makeSuite(cls))
303
    print('* Test * %s *' % test_name)
304
    r = TextTestRunner(verbosity=2).run(suite)
305
    failure_collector += r.failures
306
    return r.testsRun
307

    
308

    
309
def get_test_classes(module=__import__(__name__), name=''):
310
    module_stack = [module]
311
    while module_stack:
312
        module = module_stack[-1]
313
        module_stack = module_stack[:-1]
314
        for objname, obj in getmembers(module):
315
            if (objname == name or not name):
316
                if isclass(obj) and objname != 'TestCase' and (
317
                        issubclass(obj, TestCase)):
318
                    yield (obj, objname)
319

    
320

    
321
def main(argv):
322
    found = False
323
    failure_collector = list()
324
    num_of_tests = 0
325
    for cls, name in get_test_classes(name=argv[1] if len(argv) > 1 else ''):
326
        found = True
327
        num_of_tests += runTestCase(cls, name, argv[2:], failure_collector)
328
    if not found:
329
        print('Test "%s" not found' % ' '.join(argv[1:]))
330
    else:
331
        for i, failure in enumerate(failure_collector):
332
            print('Failure %s: ' % (i + 1))
333
            for field in failure:
334
                print('\t%s' % field)
335
        print('\nTotal tests run: %s' % num_of_tests)
336
        print('Total failures: %s' % len(failure_collector))
337

    
338

    
339
if __name__ == '__main__':
340
    from sys import argv
341
    main(argv)