Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / test.py @ 38a79780

History | View | Annotate | Download (18.6 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from unittest import makeSuite, TestSuite, TextTestRunner, TestCase
35
from inspect import getmembers, isclass
36
from tempfile import NamedTemporaryFile
37
from mock import patch, call
38
from itertools import product
39

    
40
from kamaki.cli.command_tree.test import Command, CommandTree
41
from kamaki.cli.config.test import Config
42
from kamaki.cli.argument.test import (
43
    Argument, ConfigArgument, RuntimeConfigArgument, FlagArgument,
44
    ValueArgument, IntArgument, DateArgument, VersionArgument,
45
    RepeatableArgument, KeyValueArgument, ProgressBarArgument,
46
    ArgumentParseManager)
47
from kamaki.cli.utils.test import UtilsMethods
48

    
49

    
50
class History(TestCase):
51

    
52
    def setUp(self):
53
        from kamaki.cli.history import History as HClass
54
        self.HCLASS = HClass
55
        self.file = NamedTemporaryFile()
56

    
57
    def tearDown(self):
58
        self.file.close()
59

    
60
    def test__match(self):
61
        self.assertRaises(AttributeError, self.HCLASS._match, 'ok', 42)
62
        self.assertRaises(TypeError, self.HCLASS._match, 2.71, 'ok')
63
        for args, expected in (
64
                (('XXX', None), True),
65
                ((None, None), True),
66
                (('this line has some terms', 'some terms'), True),
67
                (('this line has some terms', 'some bad terms'), False),
68
                (('small line', 'not so small line terms'), False),
69
                ((['line', 'with', 'some', 'terms'], 'some terms'), True),
70
                ((['line', 'with', 'some terms'], 'some terms'), False)):
71
            self.assertEqual(self.HCLASS._match(*args), expected)
72

    
73
    def test_get(self):
74
        history = self.HCLASS(self.file.name)
75
        self.assertEqual(history.get(), [])
76

    
77
        sample_history = (
78
            'kamaki history show\n',
79
            'kamaki file list\n',
80
            'kamaki touch pithos:f1\n',
81
            'kamaki file info pithos:f1\n')
82
        self.file.write(''.join(sample_history))
83
        self.file.flush()
84

    
85
        expected = ['%s.  \t%s' % (
86
            i + 1, event) for i, event in enumerate(sample_history)]
87
        self.assertEqual(history.get(), expected)
88
        self.assertEqual(history.get('kamaki'), expected)
89
        self.assertEqual(history.get('file kamaki'), expected[1::2])
90
        self.assertEqual(history.get('pithos:f1'), expected[2:])
91
        self.assertEqual(history.get('touch pithos:f1'), expected[2:3])
92

    
93
        for limit in range(len(sample_history)):
94
            self.assertEqual(history.get(limit=limit), expected[-limit:])
95
            self.assertEqual(
96
                history.get('kamaki', limit=limit), expected[-limit:])
97

    
98
    def test_add(self):
99
        history = self.HCLASS(self.file.name)
100
        some_strings = ('a brick', 'two bricks', 'another brick', 'A wall!')
101
        for i, line in enumerate(some_strings):
102
            history.add(line)
103
            self.file.seek(0)
104
            self.assertEqual(
105
                self.file.read(), '\n'.join(some_strings[:(i + 1)]) + '\n')
106

    
107
    def test_clean(self):
108
        content = 'a brick\ntwo bricks\nanother brick\nA wall!\n'
109
        self.file.write(content)
110
        self.file.flush()
111
        self.file.seek(0)
112
        self.assertEqual(self.file.read(), content)
113
        history = self.HCLASS(self.file.name)
114
        history.clean()
115
        self.file.seek(0)
116
        self.assertEqual(self.file.read(), '')
117

    
118
    def test_retrieve(self):
119
        sample_history = (
120
            'kamaki history show\n',
121
            'kamaki file list\n',
122
            'kamaki touch pithos:f1\n',
123
            'kamaki file info pithos:f1\n',
124
            'current / last command is always excluded')
125
        self.file.write(''.join(sample_history))
126
        self.file.flush()
127

    
128
        history = self.HCLASS(self.file.name)
129
        self.assertRaises(ValueError, history.retrieve, 'must be number')
130
        self.assertRaises(TypeError, history.retrieve, [1, 2, 3])
131

    
132
        for i in (0, len(sample_history), -len(sample_history)):
133
            self.assertEqual(history.retrieve(i), None)
134
        for i in range(1, len(sample_history)):
135
            self.assertEqual(history.retrieve(i), sample_history[i - 1])
136
            self.assertEqual(history.retrieve(- i), sample_history[- i - 1])
137

    
138

    
139
class LoggerMethods(TestCase):
140

    
141
    class PseudoLogger(object):
142
        level = 'some level'
143
        _setLevel_calls = []
144
        _addHandler_calls = []
145

    
146
        def setLevel(self, *args):
147
            self._setLevel_calls.append(args)
148

    
149
        def addHandler(self, *args):
150
            self._addHandler_calls.append(args)
151

    
152
    class PseudoHandler(object):
153
        _setFormatter_calls = []
154

    
155
        def setFormatter(self, *args):
156
            self._setFormatter_calls.append(args)
157

    
158
    def setUp(self):
159
        from kamaki.cli.logger import LOG_FILE, _blacklist
160
        self.LF, self.BL = list(LOG_FILE), dict(_blacklist)
161

    
162
    def tearDown(self):
163
        self.PseudoLogger._setLevel_calls = []
164
        self.PseudoLogger._addHandler_calls = []
165
        self.PseudoLogger._setFormatter_calls = []
166
        from kamaki.cli.logger import LOG_FILE, _blacklist
167
        for e in LOG_FILE:
168
            LOG_FILE.pop()
169
        for e in self.LF:
170
            LOG_FILE.append(e)
171
        _blacklist.clear()
172
        _blacklist.update(self.BL)
173

    
174
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
175
    def test_deactivate(self, GL):
176
        from kamaki.cli.logger import deactivate, _blacklist
177
        self.assertEqual(_blacklist, {})
178
        deactivate('some logger')
179
        GL.assert_called_once_with('some logger')
180
        self.assertEqual(
181
            _blacklist.get('some logger', None), self.PseudoLogger.level)
182
        from logging import CRITICAL
183
        self.assertEqual(self.PseudoLogger._setLevel_calls[-1], (CRITICAL, ))
184

    
185
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
186
    def test_activate(self, GL):
187
        from kamaki.cli.logger import activate
188
        activate('another logger')
189
        GL.assert_called_once_with('another logger')
190
        self.assertEqual(
191
            self.PseudoLogger._setLevel_calls[-1], (self.PseudoLogger.level, ))
192

    
193
    def test_get_log_filename(self):
194
        from kamaki.cli.logger import get_log_filename, LOG_FILE
195
        f = NamedTemporaryFile()
196
        for e in LOG_FILE:
197
            LOG_FILE.pop()
198
        LOG_FILE.append(f.name)
199
        self.assertEqual(get_log_filename(), f.name)
200
        LOG_FILE.pop()
201
        LOG_FILE.append(2 * f.name)
202
        print('\n  Should print error msg here: ')
203
        self.assertEqual(get_log_filename(), None)
204

    
205
    def test_set_log_filename(self):
206
        from kamaki.cli.logger import set_log_filename, LOG_FILE
207
        for n in ('some name', 'some other name'):
208
            set_log_filename(n)
209
            self.assertEqual(LOG_FILE[0], n)
210

    
211
    @patch('kamaki.cli.logger.get_logger', return_value=PseudoLogger())
212
    @patch('kamaki.cli.logger.logging.Formatter', return_value='f0rm4t')
213
    @patch(
214
        'kamaki.cli.logger.logging.StreamHandler',
215
        return_value=PseudoHandler())
216
    @patch(
217
        'kamaki.cli.logger.logging.FileHandler',
218
        return_value=PseudoHandler())
219
    def test__add_logger(self, FH, SH, F, GL):
220
        from kamaki.cli.logger import _add_logger
221
        from logging import DEBUG
222
        stdf, cnt = '%(name)s\n %(message)s', 0
223
        for name, level, filename, fmt in product(
224
                ('my logger', ),
225
                ('my level', None),
226
                ('my filename', None),
227
                ('my fmt', None)):
228
            log = _add_logger(name, level, filename, fmt)
229
            self.assertTrue(isinstance(log, self.PseudoLogger))
230
            self.assertEqual(GL.mock_calls[-1], call(name))
231
            if filename:
232
                self.assertEqual(FH.mock_calls[-1], call(filename))
233
            else:
234
                self.assertEqual(SH.mock_calls[-1], call())
235
            self.assertEqual(F.mock_calls[-1], call(fmt or stdf))
236
            self.assertEqual(
237
                self.PseudoHandler._setFormatter_calls[-1], ('f0rm4t', ))
238
            cnt += 1
239
            self.assertEqual(len(self.PseudoLogger._addHandler_calls), cnt)
240
            h = self.PseudoLogger._addHandler_calls[-1]
241
            self.assertTrue(isinstance(h[0], self.PseudoHandler))
242
            l = self.PseudoLogger._setLevel_calls[-1]
243
            self.assertEqual(l, (level or DEBUG, ))
244

    
245
    @patch('kamaki.cli.logger.get_log_filename', return_value='my log fname')
246
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
247
    def test_add_file_logger(self, GL, GLF):
248
        from kamaki.cli.logger import add_file_logger
249
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
250
            GLFcount = GLF.call_count
251
            for name, level, filename in product(
252
                    ('my name'), ('my level', None), ('my filename', None)):
253
                self.assertEqual(add_file_logger(name, level, filename), 'AL')
254
                self.assertEqual(AL.mock_calls[-1], call(
255
                    name, level, filename or 'my log fname',
256
                    fmt='%(name)s(%(levelname)s) %(asctime)s\n\t%(message)s'))
257
                if filename:
258
                    self.assertEqual(GLFcount, GLF.call_count)
259
                else:
260
                    GLFcount = GLF.call_count
261
                    self.assertEqual(GLF.mock_calls[-1], call())
262
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
263
            self.assertEqual(add_file_logger('X'), 'my get logger ret')
264
            GL.assert_called_once_with('X')
265

    
266
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
267
    def test_add_stream_logger(self, GL):
268
        from kamaki.cli.logger import add_stream_logger
269
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
270
            for name, level, fmt in product(
271
                    ('my name'), ('my level', None), ('my fmt', None)):
272
                self.assertEqual(add_stream_logger(name, level, fmt), 'AL')
273
                self.assertEqual(AL.mock_calls[-1], call(name, level, fmt=fmt))
274
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
275
            self.assertEqual(add_stream_logger('X'), 'my get logger ret')
276
            GL.assert_called_once_with('X')
277

    
278
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
279
    def test_get_logger(self, GL):
280
        from kamaki.cli.logger import get_logger
281
        get_logger('my logger name')
282
        GL.assert_called_once_with('my logger name')
283

    
284

    
285
_RET = None
286

    
287

    
288
class PseudoException(object):
289

    
290
    def __init__(self, *args):
291
        global _RET
292
        _RET = args
293

    
294

    
295
class CLIError(TestCase):
296

    
297
    @patch('__builtin__.super', return_value=PseudoException())
298
    def test___init__(self, S):
299
        from kamaki.cli.errors import CLIError
300
        global _RET
301
        for message, details, importance in (
302
                ('some msg', [], 0),
303
                ('some msg\n', 'details', 0),
304
                ('some msg', ['details1', 'details2'], 10)):
305
            clie = CLIError(message, details, importance)
306
            self.assertEqual(S.mock_calls[-1], call(CLIError, clie))
307
            self.assertEqual(_RET[0], (message + '\n') if (
308
                message and not message.endswith('\n')) else message)
309
            self.assertEqual(clie.details, (list(details) if (
310
                isinstance(details, list)) else ['%s' % details]) if (
311
                    details) else [])
312
            self.assertEqual(clie.importance, int(importance))
313
        clie = CLIError(message, details, 'non int')
314
        self.assertEqual(clie.importance, 0)
315

    
316
    def test_raiseCLIError(self):
317
        from kamaki.cli.errors import raiseCLIError, CLIError
318
        for err, message, importance, details in (
319
                (Exception('msg'), '', 0, []),
320
                (Exception('msg'), 'orther msg', 0, []),
321
                (Exception('msg'), 'orther msg', 0, ['d1', 'd2']),
322
                (Exception('msg'), '', 10, []),
323
                (Exception('msg'), '', None, []),
324
                (CLIError('some msg'), '', None, ['d1', 'd2'])
325
            ):
326
            try:
327
                raiseCLIError(err, message, importance, details)
328
            except CLIError as clie:
329
                exp_msg = '%s' % (message or err)
330
                exp_msg += '' if exp_msg.endswith('\n') else '\n'
331
                self.assertEqual('%s' % clie, exp_msg)
332
                self.assertEqual(clie.importance, importance or 0)
333
                exp_d = list(details) if isinstance(details, list) else [
334
                    '%s' % (details or '')]
335
                base_msg = '%s' % err
336
                if message and base_msg != message:
337
                    exp_d.append(base_msg)
338
                self.assertEqual(clie.details, exp_d)
339

    
340

    
341
class CLIUnimplemented(TestCase):
342

    
343
    def test___init__(self):
344
        from kamaki.cli.errors import CLIUnimplemented
345
        cliu = CLIUnimplemented()
346
        self.assertEqual(
347
            '%s' % cliu,
348
            'I \'M SORRY, DAVE.\nI \'M AFRAID I CAN\'T DO THAT.\n')
349
        self.assertEqual(cliu.details, [
350
            '      _        |',
351
            '   _-- --_     |',
352
            '  --     --    |',
353
            ' --   .   --   |',
354
            ' -_       _-   |',
355
            '   -_   _-     |',
356
            '      -        |'])
357
        self.assertEqual(cliu.importance, 3)
358

    
359

    
360
class CLIBaseUrlError(TestCase):
361

    
362
    def test___init__(self):
363
        from kamaki.cli.errors import CLIBaseUrlError
364
        for service in ('', 'some service'):
365
            clibue = CLIBaseUrlError(service=service)
366
            self.assertEqual('%s' % clibue, 'No URL for %s\n' % service)
367
            self.assertEqual(clibue.details, [
368
                'Two ways to resolve this:',
369
                '(Use the correct cloud name, instead of "default")',
370
                'A. (recommended) Let kamaki discover endpoint URLs for all',
371
                'services by setting a single Authentication URL and token:',
372
                '  /config set cloud.default.url <AUTH_URL>',
373
                '  /config set cloud.default.token <t0k3n>',
374
                'B. (advanced users) Explicitly set an %s endpoint URL' % (
375
                    service.upper()),
376
                'Note: URL option has a higher priority, so delete it to',
377
                'make that work',
378
                '  /config delete cloud.default.url',
379
                '  /config set cloud.%s.url <%s_URL>' % (
380
                    service, service.upper())])
381
            self.assertEqual(clibue.importance, 2)
382

    
383

    
384
class CLISyntaxError(TestCase):
385

    
386
    def test___init__(self):
387
        from kamaki.cli.errors import CLISyntaxError
388
        clise = CLISyntaxError()
389
        self.assertEqual('%s' % clise, 'Syntax Error\n')
390
        self.assertEqual(clise.details, [])
391
        self.assertEqual(clise.importance, 1)
392

    
393

    
394
class CLIInvalidArgument(TestCase):
395

    
396
    def test___init__(self):
397
        from kamaki.cli.errors import CLIInvalidArgument
398
        cliia = CLIInvalidArgument()
399
        self.assertEqual('%s' % cliia, 'Invalid Argument\n')
400
        self.assertEqual(cliia.details, [])
401
        self.assertEqual(cliia.importance, 1)
402

    
403

    
404
class CLIUnknownCommand(TestCase):
405

    
406
    def test___init__(self):
407
        from kamaki.cli.errors import CLIUnknownCommand
408
        cliec = CLIUnknownCommand()
409
        self.assertEqual('%s' % cliec, 'Unknown Command\n')
410
        self.assertEqual(cliec.details, [])
411
        self.assertEqual(cliec.importance, 1)
412

    
413

    
414
class CLICmdSpecError(TestCase):
415

    
416
    def test___init__(self):
417
        from kamaki.cli.errors import CLICmdSpecError
418
        clicse = CLICmdSpecError()
419
        self.assertEqual('%s' % clicse, 'Command Specification Error\n')
420
        self.assertEqual(clicse.details, [])
421
        self.assertEqual(clicse.importance, 0)
422

    
423

    
424
#  TestCase auxiliary methods
425

    
426
def runTestCase(cls, test_name, args=[], failure_collector=[]):
427
    """
428
    :param cls: (TestCase) a set of Tests
429

430
    :param test_name: (str)
431

432
    :param args: (list) these are prefixed with test_ and used as params when
433
        instantiating cls
434

435
    :param failure_collector: (list) collects info of test failures
436

437
    :returns: (int) total # of run tests
438
    """
439
    suite = TestSuite()
440
    if args:
441
        suite.addTest(cls('_'.join(['test'] + args)))
442
    else:
443
        suite.addTest(makeSuite(cls))
444
    print('* Test * %s *' % test_name)
445
    r = TextTestRunner(verbosity=2).run(suite)
446
    failure_collector += r.failures
447
    return r.testsRun
448

    
449

    
450
def get_test_classes(module=__import__(__name__), name=''):
451
    module_stack = [module]
452
    while module_stack:
453
        module = module_stack[-1]
454
        module_stack = module_stack[:-1]
455
        for objname, obj in getmembers(module):
456
            if (objname == name or not name):
457
                if isclass(obj) and objname != 'TestCase' and (
458
                        issubclass(obj, TestCase)):
459
                    yield (obj, objname)
460

    
461

    
462
def main(argv):
463
    found = False
464
    failure_collector = list()
465
    num_of_tests = 0
466
    for cls, name in get_test_classes(name=argv[1] if len(argv) > 1 else ''):
467
        found = True
468
        num_of_tests += runTestCase(cls, name, argv[2:], failure_collector)
469
    if not found:
470
        print('Test "%s" not found' % ' '.join(argv[1:]))
471
    else:
472
        for i, failure in enumerate(failure_collector):
473
            print('Failure %s: ' % (i + 1))
474
            for field in failure:
475
                print('\t%s' % field)
476
        print('\nTotal tests run: %s' % num_of_tests)
477
        print('Total failures: %s' % len(failure_collector))
478

    
479

    
480
if __name__ == '__main__':
481
    from sys import argv
482
    main(argv)