Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / test.py @ edaf3ba6

History | View | Annotate | Download (18.6 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from unittest import makeSuite, TestSuite, TextTestRunner, TestCase
35
from inspect import getmembers, isclass
36
from tempfile import NamedTemporaryFile
37
from mock import patch, call
38
from itertools import product
39

    
40
from kamaki.cli.command_tree.test import Command, CommandTree
41
from kamaki.cli.config.test import Config
42
from kamaki.cli.argument.test import (
43
    Argument, ConfigArgument, RuntimeConfigArgument, FlagArgument,
44
    ValueArgument, IntArgument, DateArgument, VersionArgument,
45
    RepeatableArgument, KeyValueArgument, ProgressBarArgument,
46
    ArgumentParseManager)
47
from kamaki.cli.utils.test import UtilsMethods
48

    
49

    
50
class History(TestCase):
51

    
52
    def setUp(self):
53
        from kamaki.cli.history import History as HClass
54
        self.HCLASS = HClass
55
        self.file = NamedTemporaryFile()
56

    
57
    def tearDown(self):
58
        self.file.close()
59

    
60
    def test__match(self):
61
        self.assertRaises(AttributeError, self.HCLASS._match, 'ok', 42)
62
        self.assertRaises(TypeError, self.HCLASS._match, 2.71, 'ok')
63
        for args, expected in (
64
                (('XXX', None), True),
65
                ((None, None), True),
66
                (('this line has some terms', 'some terms'), True),
67
                (('this line has some terms', 'some bad terms'), False),
68
                (('small line', 'not so small line terms'), False),
69
                ((['line', 'with', 'some', 'terms'], 'some terms'), True),
70
                ((['line', 'with', 'some terms'], 'some terms'), False)):
71
            self.assertEqual(self.HCLASS._match(*args), expected)
72

    
73
    def test_get(self):
74
        history = self.HCLASS(self.file.name)
75
        self.assertEqual(history.get(), [])
76

    
77
        sample_history = (
78
            'kamaki history show\n',
79
            'kamaki file list\n',
80
            'kamaki touch pithos:f1\n',
81
            'kamaki file info pithos:f1\n')
82
        self.file.write(''.join(sample_history))
83
        self.file.flush()
84

    
85
        expected = ['%s.  \t%s' % (
86
            i + 1, event) for i, event in enumerate(sample_history)]
87
        self.assertEqual(history.get(), expected)
88
        self.assertEqual(history.get('kamaki'), expected)
89
        self.assertEqual(history.get('file kamaki'), expected[1::2])
90
        self.assertEqual(history.get('pithos:f1'), expected[2:])
91
        self.assertEqual(history.get('touch pithos:f1'), expected[2:3])
92

    
93
        for limit in range(len(sample_history)):
94
            self.assertEqual(history.get(limit=limit), expected[-limit:])
95
            self.assertEqual(
96
                history.get('kamaki', limit=limit), expected[-limit:])
97

    
98
    def test_add(self):
99
        history = self.HCLASS(self.file.name)
100
        some_strings = ('a brick', 'two bricks', 'another brick', 'A wall!')
101
        for i, line in enumerate(some_strings):
102
            history.add(line)
103
            self.file.seek(0)
104
            self.assertEqual(
105
                self.file.read(), '\n'.join(some_strings[:(i + 1)]) + '\n')
106

    
107
    def test_empty(self):
108
        content = 'a brick\ntwo bricks\nanother brick\nA wall!\n'
109
        self.file.write(content)
110
        self.file.flush()
111
        self.file.seek(0)
112
        self.assertEqual(self.file.read(), content)
113
        history = self.HCLASS(self.file.name)
114
        history.empty()
115
        self.file.seek(0)
116
        self.assertEqual(self.file.read(), '')
117

    
118
    def test_retrieve(self):
119
        sample_history = (
120
            'kamaki history show\n',
121
            'kamaki file list\n',
122
            'kamaki touch pithos:f1\n',
123
            'kamaki file info pithos:f1\n',
124
            'current / last command is always excluded')
125
        self.file.write(''.join(sample_history))
126
        self.file.flush()
127

    
128
        history = self.HCLASS(self.file.name)
129
        retrieve = history.__getitem__
130
        self.assertRaises(ValueError, retrieve, 'must be number')
131
        self.assertRaises(TypeError, retrieve, [1, 2, 3])
132

    
133
        for i in (0, len(sample_history), -len(sample_history)):
134
            self.assertEqual(history[i], None)
135
        for i in range(1, len(sample_history)):
136
            self.assertEqual(history[i], sample_history[i - 1])
137
            self.assertEqual(history[- i], sample_history[- i - 1])
138

    
139

    
140
class LoggerMethods(TestCase):
141

    
142
    class PseudoLogger(object):
143
        level = 'some level'
144
        _setLevel_calls = []
145
        _addHandler_calls = []
146

    
147
        def setLevel(self, *args):
148
            self._setLevel_calls.append(args)
149

    
150
        def addHandler(self, *args):
151
            self._addHandler_calls.append(args)
152

    
153
    class PseudoHandler(object):
154
        _setFormatter_calls = []
155

    
156
        def setFormatter(self, *args):
157
            self._setFormatter_calls.append(args)
158

    
159
    def setUp(self):
160
        from kamaki.cli.logger import LOG_FILE, _blacklist
161
        self.LF, self.BL = list(LOG_FILE), dict(_blacklist)
162

    
163
    def tearDown(self):
164
        self.PseudoLogger._setLevel_calls = []
165
        self.PseudoLogger._addHandler_calls = []
166
        self.PseudoLogger._setFormatter_calls = []
167
        from kamaki.cli.logger import LOG_FILE, _blacklist
168
        for e in LOG_FILE:
169
            LOG_FILE.pop()
170
        for e in self.LF:
171
            LOG_FILE.append(e)
172
        _blacklist.clear()
173
        _blacklist.update(self.BL)
174

    
175
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
176
    def test_deactivate(self, GL):
177
        from kamaki.cli.logger import deactivate, _blacklist
178
        self.assertEqual(_blacklist, {})
179
        deactivate('some logger')
180
        GL.assert_called_once_with('some logger')
181
        self.assertEqual(
182
            _blacklist.get('some logger', None), self.PseudoLogger.level)
183
        from logging import CRITICAL
184
        self.assertEqual(self.PseudoLogger._setLevel_calls[-1], (CRITICAL, ))
185

    
186
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
187
    def test_activate(self, GL):
188
        from kamaki.cli.logger import activate
189
        activate('another logger')
190
        GL.assert_called_once_with('another logger')
191
        self.assertEqual(
192
            self.PseudoLogger._setLevel_calls[-1], (self.PseudoLogger.level, ))
193

    
194
    def test_get_log_filename(self):
195
        from kamaki.cli.logger import get_log_filename, LOG_FILE
196
        f = NamedTemporaryFile()
197
        for e in LOG_FILE:
198
            LOG_FILE.pop()
199
        LOG_FILE.append(f.name)
200
        self.assertEqual(get_log_filename(), f.name)
201
        LOG_FILE.pop()
202
        LOG_FILE.append(2 * f.name)
203
        print('\n  Should print error msg here: ')
204
        self.assertEqual(get_log_filename(), None)
205

    
206
    def test_set_log_filename(self):
207
        from kamaki.cli.logger import set_log_filename, LOG_FILE
208
        for n in ('some name', 'some other name'):
209
            set_log_filename(n)
210
            self.assertEqual(LOG_FILE[0], n)
211

    
212
    @patch('kamaki.cli.logger.get_logger', return_value=PseudoLogger())
213
    @patch('kamaki.cli.logger.logging.Formatter', return_value='f0rm4t')
214
    @patch(
215
        'kamaki.cli.logger.logging.StreamHandler',
216
        return_value=PseudoHandler())
217
    @patch(
218
        'kamaki.cli.logger.logging.FileHandler',
219
        return_value=PseudoHandler())
220
    def test__add_logger(self, FH, SH, F, GL):
221
        from kamaki.cli.logger import _add_logger
222
        from logging import DEBUG
223
        stdf, cnt = '%(name)s\n %(message)s', 0
224
        for name, level, filename, fmt in product(
225
                ('my logger', ),
226
                ('my level', None),
227
                ('my filename', None),
228
                ('my fmt', None)):
229
            log = _add_logger(name, level, filename, fmt)
230
            self.assertTrue(isinstance(log, self.PseudoLogger))
231
            self.assertEqual(GL.mock_calls[-1], call(name))
232
            if filename:
233
                self.assertEqual(FH.mock_calls[-1], call(filename))
234
            else:
235
                self.assertEqual(SH.mock_calls[-1], call())
236
            self.assertEqual(F.mock_calls[-1], call(fmt or stdf))
237
            self.assertEqual(
238
                self.PseudoHandler._setFormatter_calls[-1], ('f0rm4t', ))
239
            cnt += 1
240
            self.assertEqual(len(self.PseudoLogger._addHandler_calls), cnt)
241
            h = self.PseudoLogger._addHandler_calls[-1]
242
            self.assertTrue(isinstance(h[0], self.PseudoHandler))
243
            l = self.PseudoLogger._setLevel_calls[-1]
244
            self.assertEqual(l, (level or DEBUG, ))
245

    
246
    @patch('kamaki.cli.logger.get_log_filename', return_value='my log fname')
247
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
248
    def test_add_file_logger(self, GL, GLF):
249
        from kamaki.cli.logger import add_file_logger
250
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
251
            GLFcount = GLF.call_count
252
            for name, level, filename in product(
253
                    ('my name'), ('my level', None), ('my filename', None)):
254
                self.assertEqual(add_file_logger(name, level, filename), 'AL')
255
                self.assertEqual(AL.mock_calls[-1], call(
256
                    name, level, filename or 'my log fname',
257
                    fmt='%(name)s(%(levelname)s) %(asctime)s\n\t%(message)s'))
258
                if filename:
259
                    self.assertEqual(GLFcount, GLF.call_count)
260
                else:
261
                    GLFcount = GLF.call_count
262
                    self.assertEqual(GLF.mock_calls[-1], call())
263
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
264
            self.assertEqual(add_file_logger('X'), 'my get logger ret')
265
            GL.assert_called_once_with('X')
266

    
267
    @patch('kamaki.cli.logger.get_logger', return_value='my get logger ret')
268
    def test_add_stream_logger(self, GL):
269
        from kamaki.cli.logger import add_stream_logger
270
        with patch('kamaki.cli.logger._add_logger', return_value='AL') as AL:
271
            for name, level, fmt in product(
272
                    ('my name'), ('my level', None), ('my fmt', None)):
273
                self.assertEqual(add_stream_logger(name, level, fmt), 'AL')
274
                self.assertEqual(AL.mock_calls[-1], call(name, level, fmt=fmt))
275
        with patch('kamaki.cli.logger._add_logger', side_effect=Exception):
276
            self.assertEqual(add_stream_logger('X'), 'my get logger ret')
277
            GL.assert_called_once_with('X')
278

    
279
    @patch('kamaki.cli.logger.logging.getLogger', return_value=PseudoLogger())
280
    def test_get_logger(self, GL):
281
        from kamaki.cli.logger import get_logger
282
        get_logger('my logger name')
283
        GL.assert_called_once_with('my logger name')
284

    
285

    
286
_RET = None
287

    
288

    
289
class PseudoException(object):
290

    
291
    def __init__(self, *args):
292
        global _RET
293
        _RET = args
294

    
295

    
296
class CLIError(TestCase):
297

    
298
    @patch('__builtin__.super', return_value=PseudoException())
299
    def test___init__(self, S):
300
        from kamaki.cli.errors import CLIError
301
        global _RET
302
        for message, details, importance in (
303
                ('some msg', [], 0),
304
                ('some msg\n', 'details', 0),
305
                ('some msg', ['details1', 'details2'], 10)):
306
            clie = CLIError(message, details, importance)
307
            self.assertEqual(S.mock_calls[-1], call(CLIError, clie))
308
            self.assertEqual(_RET[0], (message + '\n') if (
309
                message and not message.endswith('\n')) else message)
310
            self.assertEqual(clie.details, (list(details) if (
311
                isinstance(details, list)) else ['%s' % details]) if (
312
                    details) else [])
313
            self.assertEqual(clie.importance, int(importance))
314
        clie = CLIError(message, details, 'non int')
315
        self.assertEqual(clie.importance, 0)
316

    
317
    def test_raiseCLIError(self):
318
        from kamaki.cli.errors import raiseCLIError, CLIError
319
        for err, message, importance, details in (
320
                (Exception('msg'), '', 0, []),
321
                (Exception('msg'), 'orther msg', 0, []),
322
                (Exception('msg'), 'orther msg', 0, ['d1', 'd2']),
323
                (Exception('msg'), '', 10, []),
324
                (Exception('msg'), '', None, []),
325
                (CLIError('some msg'), '', None, ['d1', 'd2'])
326
            ):
327
            try:
328
                raiseCLIError(err, message, importance, details)
329
            except CLIError as clie:
330
                exp_msg = '%s' % (message or err)
331
                exp_msg += '' if exp_msg.endswith('\n') else '\n'
332
                self.assertEqual('%s' % clie, exp_msg)
333
                self.assertEqual(clie.importance, importance or 0)
334
                exp_d = list(details) if isinstance(details, list) else [
335
                    '%s' % (details or '')]
336
                base_msg = '%s' % err
337
                if message and base_msg != message:
338
                    exp_d.append(base_msg)
339
                self.assertEqual(clie.details, exp_d)
340

    
341

    
342
class CLIUnimplemented(TestCase):
343

    
344
    def test___init__(self):
345
        from kamaki.cli.errors import CLIUnimplemented
346
        cliu = CLIUnimplemented()
347
        self.assertEqual(
348
            '%s' % cliu,
349
            'I \'M SORRY, DAVE.\nI \'M AFRAID I CAN\'T DO THAT.\n')
350
        self.assertEqual(cliu.details, [
351
            '      _        |',
352
            '   _-- --_     |',
353
            '  --     --    |',
354
            ' --   .   --   |',
355
            ' -_       _-   |',
356
            '   -_   _-     |',
357
            '      -        |'])
358
        self.assertEqual(cliu.importance, 3)
359

    
360

    
361
class CLIBaseUrlError(TestCase):
362

    
363
    def test___init__(self):
364
        from kamaki.cli.errors import CLIBaseUrlError
365
        for service in ('', 'some service'):
366
            clibue = CLIBaseUrlError(service=service)
367
            self.assertEqual('%s' % clibue, 'No URL for %s\n' % service)
368
            self.assertEqual(clibue.details, [
369
                'Two ways to resolve this:',
370
                '(Use the correct cloud name, instead of "default")',
371
                'A. (recommended) Let kamaki discover endpoint URLs for all',
372
                'services by setting a single Authentication URL and token:',
373
                '  /config set cloud.default.url <AUTH_URL>',
374
                '  /config set cloud.default.token <t0k3n>',
375
                'B. (advanced users) Explicitly set an %s endpoint URL' % (
376
                    service.upper()),
377
                'Note: URL option has a higher priority, so delete it to',
378
                'make that work',
379
                '  /config delete cloud.default.url',
380
                '  /config set cloud.%s.url <%s_URL>' % (
381
                    service, service.upper())])
382
            self.assertEqual(clibue.importance, 2)
383

    
384

    
385
class CLISyntaxError(TestCase):
386

    
387
    def test___init__(self):
388
        from kamaki.cli.errors import CLISyntaxError
389
        clise = CLISyntaxError()
390
        self.assertEqual('%s' % clise, 'Syntax Error\n')
391
        self.assertEqual(clise.details, [])
392
        self.assertEqual(clise.importance, 1)
393

    
394

    
395
class CLIInvalidArgument(TestCase):
396

    
397
    def test___init__(self):
398
        from kamaki.cli.errors import CLIInvalidArgument
399
        cliia = CLIInvalidArgument()
400
        self.assertEqual('%s' % cliia, 'Invalid Argument\n')
401
        self.assertEqual(cliia.details, [])
402
        self.assertEqual(cliia.importance, 1)
403

    
404

    
405
class CLIUnknownCommand(TestCase):
406

    
407
    def test___init__(self):
408
        from kamaki.cli.errors import CLIUnknownCommand
409
        cliec = CLIUnknownCommand()
410
        self.assertEqual('%s' % cliec, 'Unknown Command\n')
411
        self.assertEqual(cliec.details, [])
412
        self.assertEqual(cliec.importance, 1)
413

    
414

    
415
class CLICmdSpecError(TestCase):
416

    
417
    def test___init__(self):
418
        from kamaki.cli.errors import CLICmdSpecError
419
        clicse = CLICmdSpecError()
420
        self.assertEqual('%s' % clicse, 'Command Specification Error\n')
421
        self.assertEqual(clicse.details, [])
422
        self.assertEqual(clicse.importance, 0)
423

    
424

    
425
#  TestCase auxiliary methods
426

    
427
def runTestCase(cls, test_name, args=[], failure_collector=[]):
428
    """
429
    :param cls: (TestCase) a set of Tests
430

431
    :param test_name: (str)
432

433
    :param args: (list) these are prefixed with test_ and used as params when
434
        instantiating cls
435

436
    :param failure_collector: (list) collects info of test failures
437

438
    :returns: (int) total # of run tests
439
    """
440
    suite = TestSuite()
441
    if args:
442
        suite.addTest(cls('_'.join(['test'] + args)))
443
    else:
444
        suite.addTest(makeSuite(cls))
445
    print('* Test * %s *' % test_name)
446
    r = TextTestRunner(verbosity=2).run(suite)
447
    failure_collector += r.failures
448
    return r.testsRun
449

    
450

    
451
def get_test_classes(module=__import__(__name__), name=''):
452
    module_stack = [module]
453
    while module_stack:
454
        module = module_stack[-1]
455
        module_stack = module_stack[:-1]
456
        for objname, obj in getmembers(module):
457
            if (objname == name or not name):
458
                if isclass(obj) and objname != 'TestCase' and (
459
                        issubclass(obj, TestCase)):
460
                    yield (obj, objname)
461

    
462

    
463
def main(argv):
464
    found = False
465
    failure_collector = list()
466
    num_of_tests = 0
467
    for cls, name in get_test_classes(name=argv[1] if len(argv) > 1 else ''):
468
        found = True
469
        num_of_tests += runTestCase(cls, name, argv[2:], failure_collector)
470
    if not found:
471
        print('Test "%s" not found' % ' '.join(argv[1:]))
472
    else:
473
        for i, failure in enumerate(failure_collector):
474
            print('Failure %s: ' % (i + 1))
475
            for field in failure:
476
                print('\t%s' % field)
477
        print('\nTotal tests run: %s' % num_of_tests)
478
        print('Total failures: %s' % len(failure_collector))
479

    
480

    
481
if __name__ == '__main__':
482
    from sys import argv
483
    main(argv)