Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / config / test.py @ 8556d269

History | View | Annotate | Download (20.5 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from mock import patch, call
35
from unittest import TestCase
36
from itertools import product
37
import os
38
from tempfile import NamedTemporaryFile
39
from io import StringIO
40

    
41
from kamaki.cli.config import HEADER
42

    
43

    
44
def _2steps_gen(limit=2):
45
    counter, ret = 0, None
46
    while True:
47
        if counter >= limit:
48
            ret = None if ret else 'something'
49
            counter = 0
50
        counter += 1
51
        yield ret
52

    
53
_2value_gen = _2steps_gen()
54

    
55

    
56
class Config(TestCase):
57
    """Test Config methods"""
58

    
59
    def setUp(self):
60
        self.f = NamedTemporaryFile()
61

    
62
        from kamaki.cli.config import DEFAULTS
63

    
64
        self.DEFAULTS = dict()
65
        for k, v in DEFAULTS.items():
66
            self.DEFAULTS[k] = dict(v) if isinstance(v, dict) else v
67

    
68
        self.config_file_content = [
69
            HEADER,
70
            '[global]\n',
71
            'max_threads = 5\n',
72
            'default_cloud = ~mycloud\n',
73
            'file_cli = pithos\n',
74
            'history_file = /home/user/.kamaki.history\n',
75
            'colors = off\n',
76
            'config_cli = config\n',
77
            'history_cli = history\n',
78
            'log_token = off\n',
79
            'server_cli = cyclades\n',
80
            'user_cli = astakos\n',
81
            'log_data = off\n',
82
            'flavor_cli = cyclades\n',
83
            'image_cli = image\n',
84
            'log_file = /home/user/.kamaki.log\n',
85
            'network_cli = cyclades\n',
86
            'log_pid = off\n',
87
            '\n',
88
            '[cloud "demo"]\n',
89
            'url = https://demo.example.com\n',
90
            'token = t0k3n-0f-d3m0-3x4mp13\n',
91
            '\n',
92
            '[cloud "~mycloud"]\n',
93
            'url = https://example.com\n',
94
            'pithos_container = images\n']
95

    
96
    def tearDown(self):
97
        try:
98
            self.f.close()
99
        except Exception:
100
            pass
101
        finally:
102
            from kamaki.cli.config import DEFAULTS
103
            keys = DEFAULTS.keys()
104
            for k in keys:
105
                DEFAULTS.pop(k)
106
            for k, v in self.DEFAULTS.items():
107
                DEFAULTS[k] = v
108

    
109
    @patch('kamaki.cli.config.Config.remove_section')
110
    @patch('kamaki.cli.config.Config.items', return_value=(
111
        ('k1', 'v1'), ('k2', 'v2')))
112
    @patch('kamaki.cli.config.Config.sections', return_value=('a', 'b'))
113
    @patch('kamaki.cli.config.Config.set_cloud')
114
    @patch('kamaki.cli.config.Config.read')
115
    @patch('kamaki.cli.config.Config._load_defaults')
116
    def test___init__(
117
            self, _ld, c_read, c_set_cloud, c_sections, c_items,
118
            c_remove_section):
119
        from kamaki.cli.config import (
120
            Config, RawConfigParser, CONFIG_ENV, CONFIG_PATH)
121
        _ld_num, c_sections_num, c_items_num, c_set_cloud_num = 0, 0, 0, 0
122
        c_remove_section_num, gen_call = 0, [call('a'), call('b')]
123
        for path, with_defaults in product((None, '/a/path'), (True, False)):
124
            with patch(
125
                    'kamaki.cli.config.Config._cloud_name',
126
                    return_value=_2value_gen.next()) as _cloud_name:
127
                cnf = Config(path=path, with_defaults=with_defaults)
128
                self.assertTrue(isinstance(cnf, RawConfigParser))
129
                cpath = path or os.environ.get(CONFIG_ENV, CONFIG_PATH)
130
                self.assertEqual(cnf.path, cpath)
131
                if with_defaults:
132
                    _ld_num += 1
133
                    self.assertEqual(_ld.mock_calls[-1], call())
134
                self.assertEqual(len(_ld.mock_calls), _ld_num)
135
                self.assertEqual(c_read.mock_calls[-1], call(cpath))
136

    
137
                c_sections_num += 1
138
                self.assertEqual(len(c_sections.mock_calls), c_sections_num)
139
                self.assertEqual(c_sections.mock_calls[-1], call())
140

    
141
                self.assertEqual(_cloud_name.mock_calls, gen_call)
142

    
143
                r = _2value_gen.next()
144
                if r:
145
                    c_items_num += 2
146
                    self.assertEqual(c_items.mock_calls[-2:], gen_call)
147
                    c_set_cloud_num += 4
148
                    self.assertEqual(c_set_cloud.mock_calls[-4:], [
149
                        call(r, 'k1', 'v1'), call(r, 'k2', 'v2')] * 2)
150
                    c_remove_section_num += 2
151
                    self.assertEqual(
152
                        c_remove_section.mock_calls[-2:], gen_call)
153
                self.assertEqual(len(c_items.mock_calls), c_items_num)
154
                self.assertEqual(len(c_set_cloud.mock_calls), c_set_cloud_num)
155
                self.assertEqual(
156
                    len(c_remove_section.mock_calls), c_remove_section_num)
157

    
158
    def test__cloud_name(self):
159
        from kamaki.cli.config import (
160
            Config, CLOUD_PREFIX, InvalidCloudNameError)
161
        cn = Config._cloud_name
162
        self.assertEqual(cn('non%s name' % CLOUD_PREFIX), None)
163
        for invalid in ('"!@#$%^&())_"', '"a b c"', u'"\xce\xcd"', 'naked'):
164
            self.assertRaises(
165
                InvalidCloudNameError, cn, '%s %s' % (CLOUD_PREFIX, invalid))
166
        for valid in ('word', '~okeanos', 'd0t.ted', 'ha$h#ed'):
167
            self.assertEqual(cn('%s "%s"' % (CLOUD_PREFIX, valid)), valid)
168

    
169
    def test_rescue_old_file(self):
170
        from kamaki.cli.config import Config
171

    
172
        content0 = list(self.config_file_content)
173

    
174
        def make_file(lines):
175
            f = NamedTemporaryFile()
176
            f.writelines(lines)
177
            f.flush()
178
            return f
179

    
180
        with make_file(content0) as f:
181
            _cnf = Config(path=f.name)
182
            self.assertEqual([], _cnf.rescue_old_file())
183
        del _cnf
184

    
185
        content1, sample = list(content0), 'xyz_cli = XYZ_specs'
186
        content1.insert(2, '%s\n' % sample)
187

    
188
        with make_file(content1) as f:
189
            f.seek(0)
190
            _cnf = Config(path=f.name)
191
            self.assertEqual(
192
                sorted(['global.%s' % sample]), sorted(_cnf.rescue_old_file()))
193
        del _cnf
194

    
195
        content2, sample = list(content0), 'http://www.example2.org'
196
        content2.insert(2, 'url = %s\n' % sample)
197
        err = StringIO()
198

    
199
        with make_file(content2) as f:
200
            _cnf = Config(path=f.name)
201
            self.assertEqual([], _cnf.rescue_old_file(err=err))
202
            self.assertEqual(
203
                '... rescue global.url => cloud.default.url\n', err.getvalue())
204
            self.assertEqual(sample, _cnf.get_cloud('default', 'url'))
205
        del _cnf
206

    
207
        content3 = list(content0)
208
        content3.insert(
209
            2, 'url = http://example1.com\nurl = http://example2.com\n')
210

    
211
        with make_file(content3) as f:
212
            _cnf = Config(path=f.name)
213
            self.assertEqual([], _cnf.rescue_old_file(err=err))
214
            self.assertEqual(
215
                2 * '... rescue global.url => cloud.default.url\n',
216
                err.getvalue())
217
            self.assertEqual(
218
                'http://example2.com', _cnf.get_cloud('default', 'url'))
219
        del _cnf
220

    
221
        content4 = list(content0)
222
        content4.insert(2, 'url = http://example1.com\n')
223
        content4.append('\n[cloud "default"]\nurl=http://example2.com\n')
224

    
225
        with make_file(content4) as f:
226
            _cnf = Config(path=f.name)
227
            from kamaki.cli.errors import CLISyntaxError
228
            self.assertRaises(CLISyntaxError, _cnf.rescue_old_file)
229
        del _cnf
230

    
231
        content5 = list(content0)
232
        extras = [
233
            ('pithos_cli', 'pithos'), ('store_cli', 'pithos'),
234
            ('storage_cli', 'pithos'), ('compute_cli', 'cyclades'),
235
            ('cyclades_cli', 'cyclades')]
236
        for sample in extras:
237
            content5.insert(2, '%s = %s\n' % sample)
238

    
239
        with make_file(content5) as f:
240
            _cnf = Config(path=f.name)
241
            self.assertEqual(
242
                sorted(['global.%s = %s' % sample for sample in extras]),
243
                 sorted(_cnf.rescue_old_file()))
244

    
245
    def test_guess_version(self):
246
        from kamaki.cli.config import Config
247
        from kamaki.cli.logger import add_file_logger
248

    
249
        def make_log_file():
250
            f = NamedTemporaryFile()
251
            add_file_logger('kamaki.cli.config', filename=f.name)
252
            return f
253

    
254
        def make_file(lines):
255
            f = NamedTemporaryFile()
256
            f.writelines(lines)
257
            f.flush()
258
            return f
259

    
260
        with make_file([]) as f:
261
            with make_log_file() as logf:
262
                _cnf = Config(path=f.name)
263
                self.assertEqual(0.9, _cnf.guess_version())
264
                exp = 'All heuristics failed, cannot decide\n'
265
                logf.file.seek(- len(exp), 2)
266
                self.assertEqual(exp, logf.read())
267

    
268
        content0 = list(self.config_file_content)
269

    
270
        with make_file(content0) as f:
271
            with make_log_file() as logf:
272
                _cnf = Config(path=f.name)
273
                self.assertEqual(0.9, _cnf.guess_version())
274
                exp = '... found cloud "demo"\n'
275
                logf.seek(- len(exp), 2)
276
                self.assertEqual(exp, logf.read())
277

    
278
        for term in ('url', 'token'):
279
            content1 = list(content0)
280
            content1.insert(2, '%s = some_value' % term)
281

    
282
            with make_file(content1) as f:
283
                with make_log_file() as logf:
284
                    _cnf = Config(path=f.name)
285
                    self.assertEqual(0.8, _cnf.guess_version())
286
                    exp = '..... config file has an old global section\n'
287
                    logf.seek(- len(exp), 2)
288
                    self.assertEqual(exp, logf.read())
289

    
290
    def test_get_cloud(self):
291
        from kamaki.cli.config import Config, CLOUD_PREFIX
292

    
293
        _cnf = Config(path=self.f.name)
294
        d = dict(opt1='v1', opt2='v2')
295
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
296
            self.assertEqual('v1', _cnf.get_cloud('mycloud', 'opt1'))
297
            self.assertEqual(
298
                get.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud'))
299
            self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt3')
300
        with patch('kamaki.cli.config.Config.get', return_value=0) as get:
301
            self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt1')
302

    
303
    def test_get_global(self):
304
        from kamaki.cli.config import Config
305

    
306
        _cnf = Config(path=self.f.name)
307
        with patch('kamaki.cli.config.Config.get', return_value='val') as get:
308
            self.assertEqual('val', _cnf.get_global('opt'))
309
            get.assert_called_once_with('global', 'opt')
310

    
311
    @patch('kamaki.cli.config.Config.set')
312
    def test_set_cloud(self, c_set):
313
        from kamaki.cli.config import Config, CLOUD_PREFIX
314
        _cnf = Config(path=self.f.name)
315

    
316
        d = dict(k='v')
317
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
318
            _cnf.set_cloud('mycloud', 'opt', 'val')
319
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
320
            d['opt'] = 'val'
321
            self.assertEqual(
322
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
323

    
324
        with patch('kamaki.cli.config.Config.get', return_value=None) as get:
325
            _cnf.set_cloud('mycloud', 'opt', 'val')
326
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
327
            d = dict(opt='val')
328
            self.assertEqual(
329
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
330

    
331
        with patch(
332
                'kamaki.cli.config.Config.get', side_effect=KeyError()) as get:
333
            _cnf.set_cloud('mycloud', 'opt', 'val')
334
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
335
            d = dict(opt='val')
336
            self.assertEqual(
337
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
338

    
339
    def test_set_global(self):
340
        from kamaki.cli.config import Config
341
        _cnf = Config(path=self.f.name)
342

    
343
        with patch('kamaki.cli.config.Config.set') as c_set:
344
            _cnf.set_global('opt', 'val')
345
            c_set.assert_called_once_with('global', 'opt', 'val')
346

    
347
    def test__load_defaults(self):
348
        from kamaki.cli.config import Config, DEFAULTS
349
        _cnf = Config(path=self.f.name)
350

    
351
        with patch('kamaki.cli.config.Config.set') as c_set:
352
            _cnf._load_defaults()
353
            for i, (section, options) in enumerate(DEFAULTS.items()):
354
                for j, (option, val) in enumerate(options.items()):
355
                    self.assertEqual(
356
                        c_set.mock_calls[(i + 1) * j],
357
                        call(section, option, val))
358

    
359
    def test__get_dict(self):
360
        from kamaki.cli.config import Config, CLOUD_PREFIX, DEFAULTS
361

    
362
        def make_file(lines):
363
            f = NamedTemporaryFile()
364
            f.writelines(lines)
365
            f.flush()
366
            return f
367

    
368
        with make_file([]) as f:
369
            _cnf = Config(path=f.name)
370
            for term in ('global', CLOUD_PREFIX):
371
                self.assertEqual(DEFAULTS[term], _cnf._get_dict(term))
372
            for term in ('nosection', ''):
373
                self.assertEqual({}, _cnf._get_dict(term))
374

    
375
        with make_file(self.config_file_content) as f:
376
            _cnf = Config(path=f.name)
377
            for term in ('global', CLOUD_PREFIX):
378
                self.assertNotEqual(DEFAULTS[term], _cnf._get_dict(term))
379

    
380
    def test_reload(self):
381
        from kamaki.cli.config import Config
382
        _cnf = Config(path=self.f.name)
383

    
384
        with patch('kamaki.cli.config.Config.__init__') as i:
385
            _cnf.reload()
386
            i.assert_called_once_with(self.f.name)
387

    
388
    @patch('kamaki.cli.config.Config.get_cloud', return_value='get cloud')
389
    def test_get(self, get_cloud):
390
        from kamaki.cli.config import Config
391
        _cnf = Config(path=self.f.name)
392
        self.assertEqual('pithos', _cnf.get('global', 'file_cli'))
393
        self.assertEqual(get_cloud.mock_calls, [])
394
        for opt, sec in (('cloud', 'non-existing'), ('non-opt', 'exists')):
395
            self.assertEqual(None, _cnf.get(opt, sec))
396
            self.assertEqual(get_cloud.mock_calls, [])
397
        self.assertEqual('get cloud', _cnf.get('cloud.demo', 'url'))
398
        self.assertEqual(get_cloud.mock_calls[-1], call('demo', 'url'))
399

    
400
    def test_set(self):
401
        from kamaki.cli.config import Config, CLOUD_PREFIX
402
        _cnf = Config(path=self.f.name)
403

    
404
        with patch(
405
                'kamaki.cli.config.Config._cloud_name',
406
                return_value='cn') as _cloud_name:
407
            with patch(
408
                    'kamaki.cli.config.Config.set_cloud',
409
                    return_value='sc') as set_cloud:
410
                self.assertEqual(
411
                    'sc', _cnf.set('%s.sec' % CLOUD_PREFIX, 'opt', 'val'))
412
                self.assertEqual(
413
                    _cloud_name.mock_calls[-1],
414
                    call('%s "sec"' % CLOUD_PREFIX))
415
                self.assertEqual(
416
                    set_cloud.mock_calls[-1], call('cn', 'opt', 'val'))
417

    
418
                self.assertTrue(len(_cnf.items('global')) > 0)
419
                self.assertEqual(None, _cnf.set('global', 'opt', 'val'))
420
                self.assertTrue(('opt', 'val') in _cnf.items('global'))
421

    
422
                self.assertTrue(len(_cnf.items('new')) == 0)
423
                self.assertEqual(None, _cnf.set('new', 'opt', 'val'))
424
                self.assertTrue(('opt', 'val') in _cnf.items('new'))
425

    
426
    def test_remove_option(self):
427
        from kamaki.cli.config import Config
428
        _cnf = Config(path=self.f.name)
429

    
430
        self.assertEqual(len(_cnf.items('no-section')), 0)
431
        _cnf.remove_option('no-section', 'opt', False)
432
        self.assertEqual(len(_cnf.items('no-section')), 0)
433
        _cnf.remove_option('no-section', 'opt', True)
434
        self.assertEqual(len(_cnf.items('no-section')), 0)
435

    
436
        opt_num = len(_cnf.items('global'))
437
        self.assertTrue(opt_num > 0)
438
        _cnf.remove_option('global', 'file_cli', False)
439
        self.assertEqual(len(_cnf.items('global')), opt_num)
440
        _cnf.remove_option('global', 'file_cli', True)
441
        self.assertEqual(len(_cnf.items('global')), opt_num - 1)
442

    
443
        _cnf.set('global', 'server_cli', 'alt-server')
444
        self.assertTrue(('server_cli', 'alt-server') in _cnf.items('global'))
445
        self.assertFalse(('server_cli', 'cyclades') in _cnf.items('global'))
446
        _cnf.remove_option('global', 'server_cli', False)
447
        self.assertFalse(('server_cli', 'alt-server') in _cnf.items('global'))
448
        self.assertTrue(('server_cli', 'cyclades') in _cnf.items('global'))
449
        _cnf.remove_option('global', 'server_cli', True)
450
        self.assertFalse(('server_cli', 'alt-server') in _cnf.items('global'))
451
        self.assertFalse(('server_cli', 'cyclades') in _cnf.items('global'))
452

    
453
    def test_remove_from_cloud(self):
454
        from kamaki.cli.config import Config, CLOUD_PREFIX
455
        _cnf = Config(path=self.f.name)
456

    
457
        d = dict(k1='v1', k2='v2')
458
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
459
            _cnf.remove_from_cloud('cld', 'k1')
460
            self.assertEqual(d, dict(k2='v2'))
461
            self.assertRaises(KeyError, _cnf.remove_from_cloud, 'cld', 'opt')
462
            self.assertEqual(get.mock_calls, 2 * [call(CLOUD_PREFIX, 'cld')])
463

    
464
    @patch(
465
        'kamaki.cli.config.Config._get_dict',
466
        return_value={'k1': 'v1', 'k2': 'v2'})
467
    def test_keys(self, _get_dict):
468
        from kamaki.cli.config import Config
469
        _cnf = Config(path=self.f.name)
470

    
471
        self.assertEqual(
472
            sorted(['k1', 'k2']), sorted(_cnf.keys('opt', 'boolean')))
473
        _get_dict.assert_called_once_with('opt', 'boolean')
474

    
475
    @patch(
476
        'kamaki.cli.config.Config._get_dict',
477
        return_value={'k1': 'v1', 'k2': 'v2'})
478
    def test_items(self, _get_dict):
479
        from kamaki.cli.config import Config
480
        _cnf = Config(path=self.f.name)
481

    
482
        self.assertEqual(
483
            sorted([('k1', 'v1'), ('k2', 'v2')]),
484
            sorted(_cnf.items('opt', 'boolean')))
485
        _get_dict.assert_called_once_with('opt', 'boolean')
486

    
487
    def test_override(self):
488
        from kamaki.cli.config import Config
489
        _cnf = Config(path=self.f.name)
490

    
491
        _cnf.override('sec', 'opt', 'val')
492
        self.assertEqual(_cnf._overrides['sec']['opt'], 'val')
493

    
494
    def test_write(self):
495
        from kamaki.cli.config import Config, DEFAULTS
496
        _cnf = Config(path=self.f.name)
497

    
498
        exp = '%s[global]\n' % HEADER
499
        exp += ''.join([
500
            '%s = %s\n' % (k, v) for k, v in DEFAULTS['global'].items()])
501
        exp += '\n'
502

    
503
        _cnf.write()
504
        self.f.seek(0)
505
        self.assertEqual(self.f.read(), exp)
506

    
507
        del _cnf
508
        with NamedTemporaryFile() as f:
509
            f.write('\n'.join(self.config_file_content))
510
            f.flush()
511
            _cnf = Config(path=f.name)
512
            f.seek(0)
513
            self.assertEqual(f.read(), '\n'.join(self.config_file_content))
514
            _cnf.write()
515
            f.seek(0)
516
            file_contents = f.read()
517
            for line in self.config_file_content:
518
                self.assertTrue(line in file_contents)
519
            _cnf.set('sec', 'opt', 'val')
520
            _cnf.set('global', 'opt', 'val')
521
            _cnf.set('global', 'file_cli', 'val')
522
            _cnf.write()
523
            f.seek(0)
524
            file_contents = f.read()
525
            for line in ('file_cli = val\n', '[sec]\n', 'opt = val\n'):
526
                self.assertTrue(line in file_contents)
527

    
528

    
529
if __name__ == '__main__':
530
    from sys import argv
531
    from kamaki.cli.test import runTestCase
532
    runTestCase(Config, 'Config', argv[1:])