Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / config / test.py @ 534e7bbb

History | View | Annotate | Download (19.9 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from mock import patch, call
35
from unittest import TestCase
36
from itertools import product
37
import os
38
from tempfile import NamedTemporaryFile
39
from io import StringIO
40

    
41
from kamaki.cli.config import HEADER
42

    
43

    
44
def _2steps_gen(limit=2):
45
    counter, ret = 0, None
46
    while True:
47
        if counter >= limit:
48
            ret = None if ret else 'something'
49
            counter = 0
50
        counter += 1
51
        yield ret
52

    
53
_2value_gen = _2steps_gen()
54

    
55

    
56
class Config(TestCase):
57
    """Test Config methods"""
58

    
59
    def setUp(self):
60
        self.f = NamedTemporaryFile()
61

    
62
        from kamaki.cli.config import DEFAULTS
63

    
64
        self.DEFAULTS = dict()
65
        for k, v in DEFAULTS.items():
66
            self.DEFAULTS[k] = dict(v) if isinstance(v, dict) else v
67

    
68
        self.config_file_content = [
69
            HEADER,
70
            '[global]\n',
71
            'max_threads = 5\n',
72
            'default_cloud = ~mycloud\n',
73
            'file_cli = pithos\n',
74
            'history_file = /home/user/.kamaki.history\n',
75
            'colors = off\n',
76
            'config_cli = config\n',
77
            'history_cli = history\n',
78
            'log_token = off\n',
79
            'server_cli = cyclades\n',
80
            'user_cli = astakos\n',
81
            'log_data = off\n',
82
            'flavor_cli = cyclades\n',
83
            'image_cli = image\n',
84
            'log_file = /home/user/.kamaki.log\n',
85
            'network_cli = cyclades\n',
86
            'log_pid = off\n',
87
            '\n',
88
            '[cloud "demo"]\n',
89
            'url = https://demo.example.com\n',
90
            'token = t0k3n-0f-d3m0-3x4mp13\n',
91
            '\n',
92
            '[cloud "~mycloud"]\n',
93
            'url = https://example.com\n',
94
            'pithos_container = images\n']
95

    
96
    def tearDown(self):
97
        try:
98
            self.f.close()
99
        except Exception:
100
            pass
101
        finally:
102
            from kamaki.cli.config import DEFAULTS
103
            keys = DEFAULTS.keys()
104
            for k in keys:
105
                DEFAULTS.pop(k)
106
            for k, v in self.DEFAULTS.items():
107
                DEFAULTS[k] = v
108

    
109
    @patch('kamaki.cli.config.Config.remove_section')
110
    @patch('kamaki.cli.config.Config.items', return_value=(
111
        ('k1', 'v1'), ('k2', 'v2')))
112
    @patch('kamaki.cli.config.Config.sections', return_value=('a', 'b'))
113
    @patch('kamaki.cli.config.Config.set_cloud')
114
    @patch('kamaki.cli.config.Config.read')
115
    @patch('kamaki.cli.config.Config._load_defaults')
116
    def test___init__(
117
            self, _ld, c_read, c_set_cloud, c_sections, c_items,
118
            c_remove_section):
119
        from kamaki.cli.config import (
120
            Config, RawConfigParser, CONFIG_ENV, CONFIG_PATH)
121
        _ld_num, c_sections_num, c_items_num, c_set_cloud_num = 0, 0, 0, 0
122
        c_remove_section_num, gen_call = 0, [call('a'), call('b')]
123
        for path, with_defaults in product((None, '/a/path'), (True, False)):
124
            with patch(
125
                    'kamaki.cli.config.Config._cloud_name',
126
                    return_value=_2value_gen.next()) as _cloud_name:
127
                cnf = Config(path=path, with_defaults=with_defaults)
128
                self.assertTrue(isinstance(cnf, RawConfigParser))
129
                cpath = path or os.environ.get(CONFIG_ENV, CONFIG_PATH)
130
                self.assertEqual(cnf.path, cpath)
131
                if with_defaults:
132
                    _ld_num += 1
133
                    self.assertEqual(_ld.mock_calls[-1], call())
134
                self.assertEqual(len(_ld.mock_calls), _ld_num)
135
                self.assertEqual(c_read.mock_calls[-1], call(cpath))
136

    
137
                c_sections_num += 1
138
                self.assertEqual(len(c_sections.mock_calls), c_sections_num)
139
                self.assertEqual(c_sections.mock_calls[-1], call())
140

    
141
                self.assertEqual(_cloud_name.mock_calls, gen_call)
142

    
143
                r = _2value_gen.next()
144
                if r:
145
                    c_items_num += 2
146
                    self.assertEqual(c_items.mock_calls[-2:], gen_call)
147
                    c_set_cloud_num += 4
148
                    self.assertEqual(c_set_cloud.mock_calls[-4:], [
149
                        call(r, 'k1', 'v1'), call(r, 'k2', 'v2')] * 2)
150
                    c_remove_section_num += 2
151
                    self.assertEqual(
152
                        c_remove_section.mock_calls[-2:], gen_call)
153
                self.assertEqual(len(c_items.mock_calls), c_items_num)
154
                self.assertEqual(len(c_set_cloud.mock_calls), c_set_cloud_num)
155
                self.assertEqual(
156
                    len(c_remove_section.mock_calls), c_remove_section_num)
157

    
158
    def test__cloud_name(self):
159
        from kamaki.cli.config import (
160
            Config, CLOUD_PREFIX, InvalidCloudNameError)
161
        cn = Config._cloud_name
162
        self.assertEqual(cn('non%s name' % CLOUD_PREFIX), None)
163
        for invalid in ('"!@#$%^&())_"', '"a b c"', u'"\xce\xcd"', 'naked'):
164
            self.assertRaises(
165
                InvalidCloudNameError, cn, '%s %s' % (CLOUD_PREFIX, invalid))
166
        for valid in ('word', '~okeanos', 'd0t.ted', 'ha$h#ed'):
167
            self.assertEqual(cn('%s "%s"' % (CLOUD_PREFIX, valid)), valid)
168

    
169
    def test_rescue_old_file(self):
170
        from kamaki.cli.config import Config
171

    
172
        content0 = list(self.config_file_content)
173

    
174
        def make_file(lines):
175
            f = NamedTemporaryFile()
176
            f.writelines(lines)
177
            f.flush()
178
            return f
179

    
180
        with make_file(content0) as f:
181
            _cnf = Config(path=f.name)
182
            self.assertEqual([], _cnf.rescue_old_file())
183
        del _cnf
184

    
185
        content1, sample = list(content0), 'xyz_cli = XYZ_specs'
186
        content1.insert(2, '%s\n' % sample)
187

    
188
        with make_file(content1) as f:
189
            f.seek(0)
190
            _cnf = Config(path=f.name)
191
            self.assertEqual(
192
                sorted(['global.%s' % sample]), sorted(_cnf.rescue_old_file()))
193
        del _cnf
194

    
195
        content2, sample = list(content0), 'http://www.example2.org'
196
        content2.insert(2, 'url = %s\n' % sample)
197
        err = StringIO()
198

    
199
        with make_file(content2) as f:
200
            _cnf = Config(path=f.name)
201
            self.assertEqual([], _cnf.rescue_old_file(err=err))
202
            self.assertEqual(
203
                '... rescue global.url => cloud.default.url\n', err.getvalue())
204
            self.assertEqual(sample, _cnf.get_cloud('default', 'url'))
205
        del _cnf
206

    
207
        content3 = list(content0)
208
        content3.insert(
209
            2, 'url = http://example1.com\nurl = http://example2.com\n')
210

    
211
        with make_file(content3) as f:
212
            _cnf = Config(path=f.name)
213
            self.assertEqual([], _cnf.rescue_old_file(err=err))
214
            self.assertEqual(
215
                2 * '... rescue global.url => cloud.default.url\n',
216
                err.getvalue())
217
            self.assertEqual(
218
                'http://example2.com', _cnf.get_cloud('default', 'url'))
219
        del _cnf
220

    
221
        content4 = list(content0)
222
        content4.insert(2, 'url = http://example1.com\n')
223
        content4.append('\n[cloud "default"]\nurl=http://example2.com\n')
224

    
225
        with make_file(content4) as f:
226
            _cnf = Config(path=f.name)
227
            from kamaki.cli.errors import CLISyntaxError
228
            self.assertRaises(CLISyntaxError, _cnf.rescue_old_file)
229
        del _cnf
230

    
231
        content5 = list(content0)
232
        extras = [
233
            ('pithos_cli', 'pithos'), ('store_cli', 'pithos'),
234
            ('storage_cli', 'pithos'), ('compute_cli', 'cyclades'),
235
            ('cyclades_cli', 'cyclades')]
236
        for sample in extras:
237
            content5.insert(2, '%s = %s\n' % sample)
238

    
239
        with make_file(content5) as f:
240
            _cnf = Config(path=f.name)
241
            self.assertEqual(
242
                sorted(['global.%s = %s' % sample for sample in extras]),
243
                 sorted(_cnf.rescue_old_file()))
244

    
245
    def test_guess_version(self):
246
        from kamaki.cli.config import Config
247
        from kamaki.cli.logger import add_file_logger
248

    
249
        def make_log_file():
250
            f = NamedTemporaryFile()
251
            add_file_logger('kamaki.cli.config', filename=f.name)
252
            return f
253

    
254
        def make_file(lines):
255
            f = NamedTemporaryFile()
256
            f.writelines(lines)
257
            f.flush()
258
            return f
259

    
260
        with make_file([]) as f:
261
            with make_log_file() as logf:
262
                _cnf = Config(path=f.name)
263
                self.assertEqual(0.9, _cnf.guess_version())
264
                exp = 'All heuristics failed, cannot decide\n'
265
                logf.file.seek(- len(exp), 2)
266
                self.assertEqual(exp, logf.read())
267

    
268
        content0 = list(self.config_file_content)
269

    
270
        with make_file(content0) as f:
271
            with make_log_file() as logf:
272
                _cnf = Config(path=f.name)
273
                self.assertEqual(0.9, _cnf.guess_version())
274
                exp = '... found cloud "demo"\n'
275
                logf.seek(- len(exp), 2)
276
                self.assertEqual(exp, logf.read())
277

    
278
        for term in ('url', 'token'):
279
            content1 = list(content0)
280
            content1.insert(2, '%s = some_value' % term)
281

    
282
            with make_file(content1) as f:
283
                with make_log_file() as logf:
284
                    _cnf = Config(path=f.name)
285
                    self.assertEqual(0.8, _cnf.guess_version())
286
                    exp = '..... config file has an old global section\n'
287
                    logf.seek(- len(exp), 2)
288
                    self.assertEqual(exp, logf.read())
289

    
290
    def test_get_cloud(self):
291
        from kamaki.cli.config import Config, CLOUD_PREFIX
292

    
293
        _cnf = Config(path=self.f.name)
294
        d = dict(opt1='v1', opt2='v2')
295
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
296
            self.assertEqual('v1', _cnf.get_cloud('mycloud', 'opt1'))
297
            self.assertEqual(
298
                get.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud'))
299
            self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt3')
300
        with patch('kamaki.cli.config.Config.get', return_value=0) as get:
301
            self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt1')
302

    
303
    @patch('kamaki.cli.config.Config.set')
304
    def test_set_cloud(self, c_set):
305
        from kamaki.cli.config import Config, CLOUD_PREFIX
306
        _cnf = Config(path=self.f.name)
307

    
308
        d = dict(k='v')
309
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
310
            _cnf.set_cloud('mycloud', 'opt', 'val')
311
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
312
            d['opt'] = 'val'
313
            self.assertEqual(
314
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
315

    
316
        with patch('kamaki.cli.config.Config.get', return_value=None) as get:
317
            _cnf.set_cloud('mycloud', 'opt', 'val')
318
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
319
            d = dict(opt='val')
320
            self.assertEqual(
321
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
322

    
323
        with patch(
324
                'kamaki.cli.config.Config.get', side_effect=KeyError()) as get:
325
            _cnf.set_cloud('mycloud', 'opt', 'val')
326
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
327
            d = dict(opt='val')
328
            self.assertEqual(
329
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
330

    
331
    def test__load_defaults(self):
332
        from kamaki.cli.config import Config, DEFAULTS
333
        _cnf = Config(path=self.f.name)
334

    
335
        with patch('kamaki.cli.config.Config.set') as c_set:
336
            _cnf._load_defaults()
337
            for i, (section, options) in enumerate(DEFAULTS.items()):
338
                for j, (option, val) in enumerate(options.items()):
339
                    self.assertEqual(
340
                        c_set.mock_calls[(i + 1) * j],
341
                        call(section, option, val))
342

    
343
    def test__get_dict(self):
344
        from kamaki.cli.config import Config, CLOUD_PREFIX, DEFAULTS
345

    
346
        def make_file(lines):
347
            f = NamedTemporaryFile()
348
            f.writelines(lines)
349
            f.flush()
350
            return f
351

    
352
        with make_file([]) as f:
353
            _cnf = Config(path=f.name)
354
            for term in ('global', CLOUD_PREFIX):
355
                self.assertEqual(DEFAULTS[term], _cnf._get_dict(term))
356
            for term in ('nosection', ''):
357
                self.assertEqual({}, _cnf._get_dict(term))
358

    
359
        with make_file(self.config_file_content) as f:
360
            _cnf = Config(path=f.name)
361
            for term in ('global', CLOUD_PREFIX):
362
                self.assertNotEqual(DEFAULTS[term], _cnf._get_dict(term))
363

    
364
    def test_reload(self):
365
        from kamaki.cli.config import Config
366
        _cnf = Config(path=self.f.name)
367

    
368
        with patch('kamaki.cli.config.Config.__init__') as i:
369
            _cnf.reload()
370
            i.assert_called_once_with(self.f.name)
371

    
372
    @patch('kamaki.cli.config.Config.get_cloud', return_value='get cloud')
373
    def test_get(self, get_cloud):
374
        from kamaki.cli.config import Config
375
        _cnf = Config(path=self.f.name)
376
        self.assertEqual('pithos', _cnf.get('global', 'file_cli'))
377
        self.assertEqual(get_cloud.mock_calls, [])
378
        for opt, sec in (('cloud', 'non-existing'), ('non-opt', 'exists')):
379
            self.assertEqual(None, _cnf.get(opt, sec))
380
            self.assertEqual(get_cloud.mock_calls, [])
381
        self.assertEqual('get cloud', _cnf.get('cloud.demo', 'url'))
382
        self.assertEqual(get_cloud.mock_calls[-1], call('demo', 'url'))
383

    
384
    def test_set(self):
385
        from kamaki.cli.config import Config, CLOUD_PREFIX
386
        _cnf = Config(path=self.f.name)
387

    
388
        with patch(
389
                'kamaki.cli.config.Config._cloud_name',
390
                return_value='cn') as _cloud_name:
391
            with patch(
392
                    'kamaki.cli.config.Config.set_cloud',
393
                    return_value='sc') as set_cloud:
394
                self.assertEqual(
395
                    'sc', _cnf.set('%s.sec' % CLOUD_PREFIX, 'opt', 'val'))
396
                self.assertEqual(
397
                    _cloud_name.mock_calls[-1],
398
                    call('%s "sec"' % CLOUD_PREFIX))
399
                self.assertEqual(
400
                    set_cloud.mock_calls[-1], call('cn', 'opt', 'val'))
401

    
402
                self.assertTrue(len(_cnf.items('global')) > 0)
403
                self.assertEqual(None, _cnf.set('global', 'opt', 'val'))
404
                self.assertTrue(('opt', 'val') in _cnf.items('global'))
405

    
406
                self.assertTrue(len(_cnf.items('new')) == 0)
407
                self.assertEqual(None, _cnf.set('new', 'opt', 'val'))
408
                self.assertTrue(('opt', 'val') in _cnf.items('new'))
409

    
410
    def test_remove_option(self):
411
        from kamaki.cli.config import Config
412
        _cnf = Config(path=self.f.name)
413

    
414
        self.assertEqual(len(_cnf.items('no-section')), 0)
415
        _cnf.remove_option('no-section', 'opt', False)
416
        self.assertEqual(len(_cnf.items('no-section')), 0)
417
        _cnf.remove_option('no-section', 'opt', True)
418
        self.assertEqual(len(_cnf.items('no-section')), 0)
419

    
420
        opt_num = len(_cnf.items('global'))
421
        self.assertTrue(opt_num > 0)
422
        _cnf.remove_option('global', 'file_cli', False)
423
        self.assertEqual(len(_cnf.items('global')), opt_num)
424
        _cnf.remove_option('global', 'file_cli', True)
425
        self.assertEqual(len(_cnf.items('global')), opt_num - 1)
426

    
427
        _cnf.set('global', 'server_cli', 'alt-server')
428
        self.assertTrue(('server_cli', 'alt-server') in _cnf.items('global'))
429
        self.assertFalse(('server_cli', 'cyclades') in _cnf.items('global'))
430
        _cnf.remove_option('global', 'server_cli', False)
431
        self.assertFalse(('server_cli', 'alt-server') in _cnf.items('global'))
432
        self.assertTrue(('server_cli', 'cyclades') in _cnf.items('global'))
433
        _cnf.remove_option('global', 'server_cli', True)
434
        self.assertFalse(('server_cli', 'alt-server') in _cnf.items('global'))
435
        self.assertFalse(('server_cli', 'cyclades') in _cnf.items('global'))
436

    
437
    def test_remove_from_cloud(self):
438
        from kamaki.cli.config import Config, CLOUD_PREFIX
439
        _cnf = Config(path=self.f.name)
440

    
441
        d = dict(k1='v1', k2='v2')
442
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
443
            _cnf.remove_from_cloud('cld', 'k1')
444
            self.assertEqual(d, dict(k2='v2'))
445
            self.assertRaises(KeyError, _cnf.remove_from_cloud, 'cld', 'opt')
446
            self.assertEqual(get.mock_calls, 2 * [call(CLOUD_PREFIX, 'cld')])
447

    
448
    @patch(
449
        'kamaki.cli.config.Config._get_dict',
450
        return_value={'k1': 'v1', 'k2': 'v2'})
451
    def test_keys(self, _get_dict):
452
        from kamaki.cli.config import Config
453
        _cnf = Config(path=self.f.name)
454

    
455
        self.assertEqual(
456
            sorted(['k1', 'k2']), sorted(_cnf.keys('opt', 'boolean')))
457
        _get_dict.assert_called_once_with('opt', 'boolean')
458

    
459
    @patch(
460
        'kamaki.cli.config.Config._get_dict',
461
        return_value={'k1': 'v1', 'k2': 'v2'})
462
    def test_items(self, _get_dict):
463
        from kamaki.cli.config import Config
464
        _cnf = Config(path=self.f.name)
465

    
466
        self.assertEqual(
467
            sorted([('k1', 'v1'), ('k2', 'v2')]),
468
            sorted(_cnf.items('opt', 'boolean')))
469
        _get_dict.assert_called_once_with('opt', 'boolean')
470

    
471
    def test_override(self):
472
        from kamaki.cli.config import Config
473
        _cnf = Config(path=self.f.name)
474

    
475
        _cnf.override('sec', 'opt', 'val')
476
        self.assertEqual(_cnf._overrides['sec']['opt'], 'val')
477

    
478
    def test_write(self):
479
        from kamaki.cli.config import Config, DEFAULTS
480
        _cnf = Config(path=self.f.name)
481

    
482
        exp = '%s[global]\n' % HEADER
483
        exp += ''.join([
484
            '%s = %s\n' % (k, v) for k, v in DEFAULTS['global'].items()])
485
        exp += '\n'
486

    
487
        _cnf.write()
488
        self.f.seek(0)
489
        self.assertEqual(self.f.read(), exp)
490

    
491
        del _cnf
492
        with NamedTemporaryFile() as f:
493
            f.write('\n'.join(self.config_file_content))
494
            f.flush()
495
            _cnf = Config(path=f.name)
496
            f.seek(0)
497
            self.assertEqual(f.read(), '\n'.join(self.config_file_content))
498
            _cnf.write()
499
            f.seek(0)
500
            file_contents = f.read()
501
            for line in self.config_file_content:
502
                self.assertTrue(line in file_contents)
503
            _cnf.set('sec', 'opt', 'val')
504
            _cnf.set('global', 'opt', 'val')
505
            _cnf.set('global', 'file_cli', 'val')
506
            _cnf.write()
507
            f.seek(0)
508
            file_contents = f.read()
509
            for line in ('file_cli = val\n', '[sec]\n', 'opt = val\n'):
510
                self.assertTrue(line in file_contents)
511

    
512

    
513
if __name__ == '__main__':
514
    from sys import argv
515
    from kamaki.cli.test import runTestCase
516
    runTestCase(Config, 'Config', argv[1:])