Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / test.py @ 4769da6b

History | View | Annotate | Download (13.4 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from unittest import makeSuite, TestSuite, TextTestRunner, TestCase
35
from inspect import getmembers, isclass
36
from tempfile import NamedTemporaryFile
37
from mock import patch, call
38
from itertools import product
39

    
40
from kamaki.cli.command_tree.test import Command, CommandTree
41
from kamaki.cli.argument.test import (
42
    Argument, ConfigArgument, RuntimeConfigArgument, FlagArgument,
43
    ValueArgument, IntArgument, DateArgument, VersionArgument,
44
    KeyValueArgument, ProgressBarArgument, ArgumentParseManager)
45

    
46

    
47
class History(TestCase):
48

    
49
    def setUp(self):
50
        from kamaki.cli.history import History as HClass
51
        self.HCLASS = HClass
52
        self.file = NamedTemporaryFile()
53

    
54
    def tearDown(self):
55
        self.file.close()
56

    
57
    def test__match(self):
58
        self.assertRaises(AttributeError, self.HCLASS._match, 'ok', 42)
59
        self.assertRaises(TypeError, self.HCLASS._match, 2.71, 'ok')
60
        for args, expected in (
61
                (('XXX', None), True),
62
                ((None, None), True),
63
                (('this line has some terms', 'some terms'), True),
64
                (('this line has some terms', 'some bad terms'), False),
65
                (('small line', 'not so small line terms'), False),
66
                ((['line', 'with', 'some', 'terms'], 'some terms'), True),
67
                ((['line', 'with', 'some terms'], 'some terms'), False)):
68
            self.assertEqual(self.HCLASS._match(*args), expected)
69

    
70
    def test_get(self):
71
        history = self.HCLASS(self.file.name)
72
        self.assertEqual(history.get(), [])
73

    
74
        sample_history = (
75
            'kamaki history show\n',
76
            'kamaki file list\n',
77
            'kamaki touch pithos:f1\n',
78
            'kamaki file info pithos:f1\n')
79
        self.file.write(''.join(sample_history))
80
        self.file.flush()
81

    
82
        expected = ['%s.  \t%s' % (
83
            i + 1, event) for i, event in enumerate(sample_history)]
84
        self.assertEqual(history.get(), expected)
85
        self.assertEqual(history.get('kamaki'), expected)
86
        self.assertEqual(history.get('file kamaki'), expected[1::2])
87
        self.assertEqual(history.get('pithos:f1'), expected[2:])
88
        self.assertEqual(history.get('touch pithos:f1'), expected[2:3])
89

    
90
        for limit in range(len(sample_history)):
91
            self.assertEqual(history.get(limit=limit), expected[-limit:])
92
            self.assertEqual(
93
                history.get('kamaki', limit=limit), expected[-limit:])
94

    
95
    def test_add(self):
96
        history = self.HCLASS(self.file.name)
97
        some_strings = ('a brick', 'two bricks', 'another brick', 'A wall!')
98
        for i, line in enumerate(some_strings):
99
            history.add(line)
100
            self.file.seek(0)
101
            self.assertEqual(
102
                self.file.read(), '\n'.join(some_strings[:(i + 1)]) + '\n')
103

    
104
    def test_clean(self):
105
        content = 'a brick\ntwo bricks\nanother brick\nA wall!\n'
106
        self.file.write(content)
107
        self.file.flush()
108
        self.file.seek(0)
109
        self.assertEqual(self.file.read(), content)
110
        history = self.HCLASS(self.file.name)
111
        history.clean()
112
        self.file.seek(0)
113
        self.assertEqual(self.file.read(), '')
114

    
115
    def test_retrieve(self):
116
        sample_history = (
117
            'kamaki history show\n',
118
            'kamaki file list\n',
119
            'kamaki touch pithos:f1\n',
120
            'kamaki file info pithos:f1\n',
121
            'current / last command is always excluded')
122
        self.file.write(''.join(sample_history))
123
        self.file.flush()
124

    
125
        history = self.HCLASS(self.file.name)
126
        self.assertRaises(ValueError, history.retrieve, 'must be number')
127
        self.assertRaises(TypeError, history.retrieve, [1, 2, 3])
128

    
129
        for i in (0, len(sample_history), -len(sample_history)):
130
            self.assertEqual(history.retrieve(i), None)
131
        for i in range(1, len(sample_history)):
132
            self.assertEqual(history.retrieve(i), sample_history[i - 1])
133
            self.assertEqual(history.retrieve(- i), sample_history[- i - 1])
134

    
135

    
136
class LoggerMethods(TestCase):
137

    
138
    class PseudoLogger(object):
139
        level = 'some level'
140
        _setLevel_calls = []
141
        _addHandler_calls = []
142

    
143
        def setLevel(self, *args):
144
            self._setLevel_calls.append(args)
145

    
146
        def addHandler(self, *args):
147
            self._addHandler_calls.append(args)
148

    
149
    class PseudoHandler(object):
150
        _setFormatter_calls = []
151

    
152
        def setFormatter(self, *args):
153
            self._setFormatter_calls.append(args)
154

    
155
    def setUp(self):
156
        from kamaki.cli.logger import LOG_FILE, _blacklist
157
        self.LF, self.BL = list(LOG_FILE), dict(_blacklist)
158

    
159
    def tearDown(self):
160
        self.PseudoLogger._setLevel_calls = []
161
        self.PseudoLogger._addHandler_calls = []
162
        self.PseudoLogger._setFormatter_calls = []
163
        from kamaki.cli.logger import LOG_FILE, _blacklist
164
        for e in LOG_FILE:
165
            LOG_FILE.pop()
166
        for e in self.LF:
167
            LOG_FILE.append(e)
168
        _blacklist.clear()
169
        _blacklist.update(self.BL)
170

    
171
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
172
    def test_deactivate(self, GL):
173
        from kamaki.cli.logger import deactivate, _blacklist
174
        self.assertEqual(_blacklist, {})
175
        deactivate('some logger')
176
        GL.assert_called_once_with('some logger')
177
        self.assertEqual(
178
            _blacklist.get('some logger', None), self.PseudoLogger.level)
179
        from logging import CRITICAL
180
        self.assertEqual(self.PseudoLogger._setLevel_calls[-1], (CRITICAL, ))
181

    
182
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
183
    def test_activate(self, GL):
184
        from kamaki.cli.logger import activate
185
        activate('another logger')
186
        GL.assert_called_once_with('another logger')
187
        self.assertEqual(
188
            self.PseudoLogger._setLevel_calls[-1], (self.PseudoLogger.level, ))
189

    
190
    def test_get_log_filename(self):
191
        from kamaki.cli.logger import get_log_filename, LOG_FILE
192
        f = NamedTemporaryFile()
193
        for e in LOG_FILE:
194
            LOG_FILE.pop()
195
        LOG_FILE.append(f.name)
196
        self.assertEqual(get_log_filename(), f.name)
197
        LOG_FILE.pop()
198
        LOG_FILE.append(2 * f.name)
199
        print('\n  Should print error msg here: ')
200
        self.assertEqual(get_log_filename(), None)
201

    
202
    def test_set_log_filename(self):
203
        from kamaki.cli.logger import set_log_filename, LOG_FILE
204
        for n in ('some name', 'some other name'):
205
            set_log_filename(n)
206
            self.assertEqual(LOG_FILE[0], n)
207

    
208
    @patch('kamaki.cli.logger.get_logger', return_value=PseudoLogger())
209
    @patch('kamaki.cli.logger.logging.Formatter', return_value='f0rm4t')
210
    @patch(
211
        'kamaki.cli.logger.logging.StreamHandler',
212
        return_value=PseudoHandler())
213
    @patch(
214
        'kamaki.cli.logger.logging.FileHandler',
215
        return_value=PseudoHandler())
216
    def test__add_logger(self, FH, SH, F, GL):
217
        from kamaki.cli.logger import _add_logger
218
        from logging import DEBUG
219
        stdf, cnt = '%(name)s\n %(message)s', 0
220
        for name, level, filename, fmt in product(
221
                ('my logger', ),
222
                ('my level', None),
223
                ('my filename', None),
224
                ('my fmt', None)):
225
            log = _add_logger(name, level, filename, fmt)
226
            self.assertTrue(isinstance(log, self.PseudoLogger))
227
            self.assertEqual(GL.mock_calls[-1], call(name))
228
            if filename:
229
                self.assertEqual(FH.mock_calls[-1], call(filename))
230
            else:
231
                self.assertEqual(SH.mock_calls[-1], call())
232
            self.assertEqual(F.mock_calls[-1], call(fmt or stdf))
233
            self.assertEqual(
234
                self.PseudoHandler._setFormatter_calls[-1], ('f0rm4t', ))
235
            cnt += 1
236
            self.assertEqual(len(self.PseudoLogger._addHandler_calls), cnt)
237
            h = self.PseudoLogger._addHandler_calls[-1]
238
            self.assertTrue(isinstance(h[0], self.PseudoHandler))
239
            l = self.PseudoLogger._setLevel_calls[-1]
240
            self.assertEqual(l, (level or DEBUG, ))
241

    
242
    @patch('kamaki.cli.logger.get_log_filename', return_value='my log fname')
243
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
244
    def test_add_file_logger(self, GL, GLF):
245
        from kamaki.cli.logger import add_file_logger
246
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
247
            GLFcount = GLF.call_count
248
            for name, level, filename in product(
249
                    ('my name'), ('my level', None), ('my filename', None)):
250
                self.assertEqual(add_file_logger(name, level, filename), 'AL')
251
                self.assertEqual(AL.mock_calls[-1], call(
252
                    name, level, filename or 'my log fname',
253
                    fmt='%(name)s(%(levelname)s) %(asctime)s\n\t%(message)s'))
254
                if filename:
255
                    self.assertEqual(GLFcount, GLF.call_count)
256
                else:
257
                    GLFcount = GLF.call_count
258
                    self.assertEqual(GLF.mock_calls[-1], call())
259
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
260
            self.assertEqual(add_file_logger('X'), 'my get logger ret')
261
            GL.assert_called_once_with('X')
262

    
263
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
264
    def test_add_stream_logger(self, GL):
265
        from kamaki.cli.logger import add_stream_logger
266
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
267
            for name, level, fmt in product(
268
                    ('my name'), ('my level', None), ('my fmt', None)):
269
                self.assertEqual(add_stream_logger(name, level, fmt), 'AL')
270
                self.assertEqual(AL.mock_calls[-1], call(name, level, fmt=fmt))
271
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
272
            self.assertEqual(add_stream_logger('X'), 'my get logger ret')
273
            GL.assert_called_once_with('X')
274

    
275
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
276
    def test_get_logger(self, GL):
277
        from kamaki.cli.logger import get_logger
278
        get_logger('my logger name')
279
        GL.assert_called_once_with('my logger name')
280

    
281

    
282
#  TestCase auxiliary methods
283

    
284
def runTestCase(cls, test_name, args=[], failure_collector=[]):
285
    """
286
    :param cls: (TestCase) a set of Tests
287

288
    :param test_name: (str)
289

290
    :param args: (list) these are prefixed with test_ and used as params when
291
        instantiating cls
292

293
    :param failure_collector: (list) collects info of test failures
294

295
    :returns: (int) total # of run tests
296
    """
297
    suite = TestSuite()
298
    if args:
299
        suite.addTest(cls('_'.join(['test'] + args)))
300
    else:
301
        suite.addTest(makeSuite(cls))
302
    print('* Test * %s *' % test_name)
303
    r = TextTestRunner(verbosity=2).run(suite)
304
    failure_collector += r.failures
305
    return r.testsRun
306

    
307

    
308
def get_test_classes(module=__import__(__name__), name=''):
309
    module_stack = [module]
310
    while module_stack:
311
        module = module_stack[-1]
312
        module_stack = module_stack[:-1]
313
        for objname, obj in getmembers(module):
314
            if (objname == name or not name):
315
                if isclass(obj) and objname != 'TestCase' and (
316
                        issubclass(obj, TestCase)):
317
                    yield (obj, objname)
318

    
319

    
320
def main(argv):
321
    found = False
322
    failure_collector = list()
323
    num_of_tests = 0
324
    for cls, name in get_test_classes(name=argv[1] if len(argv) > 1 else ''):
325
        found = True
326
        num_of_tests += runTestCase(cls, name, argv[2:], failure_collector)
327
    if not found:
328
        print('Test "%s" not found' % ' '.join(argv[1:]))
329
    else:
330
        for i, failure in enumerate(failure_collector):
331
            print('Failure %s: ' % (i + 1))
332
            for field in failure:
333
                print('\t%s' % field)
334
        print('\nTotal tests run: %s' % num_of_tests)
335
        print('Total failures: %s' % len(failure_collector))
336

    
337

    
338
if __name__ == '__main__':
339
    from sys import argv
340
    main(argv)