Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / test.py @ ca5528f1

History | View | Annotate | Download (13.4 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from unittest import makeSuite, TestSuite, TextTestRunner, TestCase
35
from inspect import getmembers, isclass
36
from tempfile import NamedTemporaryFile
37
from mock import patch, call
38
from itertools import product
39

    
40
from kamaki.cli.command_tree.test import Command, CommandTree
41
from kamaki.cli.argument.test import (
42
    Argument, ConfigArgument, RuntimeConfigArgument, FlagArgument,
43
    ValueArgument, IntArgument, DateArgument, VersionArgument,
44
    RepeatableArgument, KeyValueArgument, ProgressBarArgument,
45
    ArgumentParseManager)
46
from kamaki.cli.utils.test import UtilsMethods
47

    
48

    
49
class History(TestCase):
50

    
51
    def setUp(self):
52
        from kamaki.cli.history import History as HClass
53
        self.HCLASS = HClass
54
        self.file = NamedTemporaryFile()
55

    
56
    def tearDown(self):
57
        self.file.close()
58

    
59
    def test__match(self):
60
        self.assertRaises(AttributeError, self.HCLASS._match, 'ok', 42)
61
        self.assertRaises(TypeError, self.HCLASS._match, 2.71, 'ok')
62
        for args, expected in (
63
                (('XXX', None), True),
64
                ((None, None), True),
65
                (('this line has some terms', 'some terms'), True),
66
                (('this line has some terms', 'some bad terms'), False),
67
                (('small line', 'not so small line terms'), False),
68
                ((['line', 'with', 'some', 'terms'], 'some terms'), True),
69
                ((['line', 'with', 'some terms'], 'some terms'), False)):
70
            self.assertEqual(self.HCLASS._match(*args), expected)
71

    
72
    def test_get(self):
73
        history = self.HCLASS(self.file.name)
74
        self.assertEqual(history.get(), [])
75

    
76
        sample_history = (
77
            'kamaki history show\n',
78
            'kamaki file list\n',
79
            'kamaki touch pithos:f1\n',
80
            'kamaki file info pithos:f1\n')
81
        self.file.write(''.join(sample_history))
82
        self.file.flush()
83

    
84
        expected = ['%s.  \t%s' % (
85
            i + 1, event) for i, event in enumerate(sample_history)]
86
        self.assertEqual(history.get(), expected)
87
        self.assertEqual(history.get('kamaki'), expected)
88
        self.assertEqual(history.get('file kamaki'), expected[1::2])
89
        self.assertEqual(history.get('pithos:f1'), expected[2:])
90
        self.assertEqual(history.get('touch pithos:f1'), expected[2:3])
91

    
92
        for limit in range(len(sample_history)):
93
            self.assertEqual(history.get(limit=limit), expected[-limit:])
94
            self.assertEqual(
95
                history.get('kamaki', limit=limit), expected[-limit:])
96

    
97
    def test_add(self):
98
        history = self.HCLASS(self.file.name)
99
        some_strings = ('a brick', 'two bricks', 'another brick', 'A wall!')
100
        for i, line in enumerate(some_strings):
101
            history.add(line)
102
            self.file.seek(0)
103
            self.assertEqual(
104
                self.file.read(), '\n'.join(some_strings[:(i + 1)]) + '\n')
105

    
106
    def test_clean(self):
107
        content = 'a brick\ntwo bricks\nanother brick\nA wall!\n'
108
        self.file.write(content)
109
        self.file.flush()
110
        self.file.seek(0)
111
        self.assertEqual(self.file.read(), content)
112
        history = self.HCLASS(self.file.name)
113
        history.clean()
114
        self.file.seek(0)
115
        self.assertEqual(self.file.read(), '')
116

    
117
    def test_retrieve(self):
118
        sample_history = (
119
            'kamaki history show\n',
120
            'kamaki file list\n',
121
            'kamaki touch pithos:f1\n',
122
            'kamaki file info pithos:f1\n',
123
            'current / last command is always excluded')
124
        self.file.write(''.join(sample_history))
125
        self.file.flush()
126

    
127
        history = self.HCLASS(self.file.name)
128
        self.assertRaises(ValueError, history.retrieve, 'must be number')
129
        self.assertRaises(TypeError, history.retrieve, [1, 2, 3])
130

    
131
        for i in (0, len(sample_history), -len(sample_history)):
132
            self.assertEqual(history.retrieve(i), None)
133
        for i in range(1, len(sample_history)):
134
            self.assertEqual(history.retrieve(i), sample_history[i - 1])
135
            self.assertEqual(history.retrieve(- i), sample_history[- i - 1])
136

    
137

    
138
class LoggerMethods(TestCase):
139

    
140
    class PseudoLogger(object):
141
        level = 'some level'
142
        _setLevel_calls = []
143
        _addHandler_calls = []
144

    
145
        def setLevel(self, *args):
146
            self._setLevel_calls.append(args)
147

    
148
        def addHandler(self, *args):
149
            self._addHandler_calls.append(args)
150

    
151
    class PseudoHandler(object):
152
        _setFormatter_calls = []
153

    
154
        def setFormatter(self, *args):
155
            self._setFormatter_calls.append(args)
156

    
157
    def setUp(self):
158
        from kamaki.cli.logger import LOG_FILE, _blacklist
159
        self.LF, self.BL = list(LOG_FILE), dict(_blacklist)
160

    
161
    def tearDown(self):
162
        self.PseudoLogger._setLevel_calls = []
163
        self.PseudoLogger._addHandler_calls = []
164
        self.PseudoLogger._setFormatter_calls = []
165
        from kamaki.cli.logger import LOG_FILE, _blacklist
166
        for e in LOG_FILE:
167
            LOG_FILE.pop()
168
        for e in self.LF:
169
            LOG_FILE.append(e)
170
        _blacklist.clear()
171
        _blacklist.update(self.BL)
172

    
173
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
174
    def test_deactivate(self, GL):
175
        from kamaki.cli.logger import deactivate, _blacklist
176
        self.assertEqual(_blacklist, {})
177
        deactivate('some logger')
178
        GL.assert_called_once_with('some logger')
179
        self.assertEqual(
180
            _blacklist.get('some logger', None), self.PseudoLogger.level)
181
        from logging import CRITICAL
182
        self.assertEqual(self.PseudoLogger._setLevel_calls[-1], (CRITICAL, ))
183

    
184
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
185
    def test_activate(self, GL):
186
        from kamaki.cli.logger import activate
187
        activate('another logger')
188
        GL.assert_called_once_with('another logger')
189
        self.assertEqual(
190
            self.PseudoLogger._setLevel_calls[-1], (self.PseudoLogger.level, ))
191

    
192
    def test_get_log_filename(self):
193
        from kamaki.cli.logger import get_log_filename, LOG_FILE
194
        f = NamedTemporaryFile()
195
        for e in LOG_FILE:
196
            LOG_FILE.pop()
197
        LOG_FILE.append(f.name)
198
        self.assertEqual(get_log_filename(), f.name)
199
        LOG_FILE.pop()
200
        LOG_FILE.append(2 * f.name)
201
        print('\n  Should print error msg here: ')
202
        self.assertEqual(get_log_filename(), None)
203

    
204
    def test_set_log_filename(self):
205
        from kamaki.cli.logger import set_log_filename, LOG_FILE
206
        for n in ('some name', 'some other name'):
207
            set_log_filename(n)
208
            self.assertEqual(LOG_FILE[0], n)
209

    
210
    @patch('kamaki.cli.logger.get_logger', return_value=PseudoLogger())
211
    @patch('kamaki.cli.logger.logging.Formatter', return_value='f0rm4t')
212
    @patch(
213
        'kamaki.cli.logger.logging.StreamHandler',
214
        return_value=PseudoHandler())
215
    @patch(
216
        'kamaki.cli.logger.logging.FileHandler',
217
        return_value=PseudoHandler())
218
    def test__add_logger(self, FH, SH, F, GL):
219
        from kamaki.cli.logger import _add_logger
220
        from logging import DEBUG
221
        stdf, cnt = '%(name)s\n %(message)s', 0
222
        for name, level, filename, fmt in product(
223
                ('my logger', ),
224
                ('my level', None),
225
                ('my filename', None),
226
                ('my fmt', None)):
227
            log = _add_logger(name, level, filename, fmt)
228
            self.assertTrue(isinstance(log, self.PseudoLogger))
229
            self.assertEqual(GL.mock_calls[-1], call(name))
230
            if filename:
231
                self.assertEqual(FH.mock_calls[-1], call(filename))
232
            else:
233
                self.assertEqual(SH.mock_calls[-1], call())
234
            self.assertEqual(F.mock_calls[-1], call(fmt or stdf))
235
            self.assertEqual(
236
                self.PseudoHandler._setFormatter_calls[-1], ('f0rm4t', ))
237
            cnt += 1
238
            self.assertEqual(len(self.PseudoLogger._addHandler_calls), cnt)
239
            h = self.PseudoLogger._addHandler_calls[-1]
240
            self.assertTrue(isinstance(h[0], self.PseudoHandler))
241
            l = self.PseudoLogger._setLevel_calls[-1]
242
            self.assertEqual(l, (level or DEBUG, ))
243

    
244
    @patch('kamaki.cli.logger.get_log_filename', return_value='my log fname')
245
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
246
    def test_add_file_logger(self, GL, GLF):
247
        from kamaki.cli.logger import add_file_logger
248
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
249
            GLFcount = GLF.call_count
250
            for name, level, filename in product(
251
                    ('my name'), ('my level', None), ('my filename', None)):
252
                self.assertEqual(add_file_logger(name, level, filename), 'AL')
253
                self.assertEqual(AL.mock_calls[-1], call(
254
                    name, level, filename or 'my log fname',
255
                    fmt='%(name)s(%(levelname)s) %(asctime)s\n\t%(message)s'))
256
                if filename:
257
                    self.assertEqual(GLFcount, GLF.call_count)
258
                else:
259
                    GLFcount = GLF.call_count
260
                    self.assertEqual(GLF.mock_calls[-1], call())
261
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
262
            self.assertEqual(add_file_logger('X'), 'my get logger ret')
263
            GL.assert_called_once_with('X')
264

    
265
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
266
    def test_add_stream_logger(self, GL):
267
        from kamaki.cli.logger import add_stream_logger
268
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
269
            for name, level, fmt in product(
270
                    ('my name'), ('my level', None), ('my fmt', None)):
271
                self.assertEqual(add_stream_logger(name, level, fmt), 'AL')
272
                self.assertEqual(AL.mock_calls[-1], call(name, level, fmt=fmt))
273
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
274
            self.assertEqual(add_stream_logger('X'), 'my get logger ret')
275
            GL.assert_called_once_with('X')
276

    
277
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
278
    def test_get_logger(self, GL):
279
        from kamaki.cli.logger import get_logger
280
        get_logger('my logger name')
281
        GL.assert_called_once_with('my logger name')
282

    
283

    
284
#  TestCase auxiliary methods
285

    
286
def runTestCase(cls, test_name, args=[], failure_collector=[]):
287
    """
288
    :param cls: (TestCase) a set of Tests
289

290
    :param test_name: (str)
291

292
    :param args: (list) these are prefixed with test_ and used as params when
293
        instantiating cls
294

295
    :param failure_collector: (list) collects info of test failures
296

297
    :returns: (int) total # of run tests
298
    """
299
    suite = TestSuite()
300
    if args:
301
        suite.addTest(cls('_'.join(['test'] + args)))
302
    else:
303
        suite.addTest(makeSuite(cls))
304
    print('* Test * %s *' % test_name)
305
    r = TextTestRunner(verbosity=2).run(suite)
306
    failure_collector += r.failures
307
    return r.testsRun
308

    
309

    
310
def get_test_classes(module=__import__(__name__), name=''):
311
    module_stack = [module]
312
    while module_stack:
313
        module = module_stack[-1]
314
        module_stack = module_stack[:-1]
315
        for objname, obj in getmembers(module):
316
            if (objname == name or not name):
317
                if isclass(obj) and objname != 'TestCase' and (
318
                        issubclass(obj, TestCase)):
319
                    yield (obj, objname)
320

    
321

    
322
def main(argv):
323
    found = False
324
    failure_collector = list()
325
    num_of_tests = 0
326
    for cls, name in get_test_classes(name=argv[1] if len(argv) > 1 else ''):
327
        found = True
328
        num_of_tests += runTestCase(cls, name, argv[2:], failure_collector)
329
    if not found:
330
        print('Test "%s" not found' % ' '.join(argv[1:]))
331
    else:
332
        for i, failure in enumerate(failure_collector):
333
            print('Failure %s: ' % (i + 1))
334
            for field in failure:
335
                print('\t%s' % field)
336
        print('\nTotal tests run: %s' % num_of_tests)
337
        print('Total failures: %s' % len(failure_collector))
338

    
339

    
340
if __name__ == '__main__':
341
    from sys import argv
342
    main(argv)