Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / config / test.py @ 9b3c8fd9

History | View | Annotate | Download (19.9 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from mock import patch, call
35
from unittest import TestCase
36
from itertools import product
37
import os
38
from tempfile import NamedTemporaryFile
39
from io import StringIO
40

    
41
from kamaki.cli.config import HEADER
42

    
43

    
44
def _2steps_gen(limit=2):
45
    counter, ret = 0, None
46
    while True:
47
        if counter >= limit:
48
            ret = None if ret else 'something'
49
            counter = 0
50
        counter += 1
51
        yield ret
52

    
53
_2value_gen = _2steps_gen()
54

    
55

    
56
class Config(TestCase):
57
    """Test Config methods"""
58

    
59
    def setUp(self):
60
        self.f = NamedTemporaryFile()
61

    
62
        from kamaki.cli.config import DEFAULTS
63

    
64
        self.DEFAULTS = dict()
65
        for k, v in DEFAULTS.items():
66
            self.DEFAULTS[k] = dict(v) if isinstance(v, dict) else v
67

    
68
        self.config_file_content = [
69
            HEADER,
70
            '[global]\n',
71
            'default_cloud = ~mycloud\n',
72
            'file_cli = pithos\n',
73
            'history_file = /home/user/.kamaki.history\n',
74
            'colors = off\n',
75
            'config_cli = config\n',
76
            'history_cli = history\n',
77
            'log_token = off\n',
78
            'server_cli = cyclades\n',
79
            'user_cli = astakos\n',
80
            'log_data = off\n',
81
            'flavor_cli = cyclades\n',
82
            'image_cli = image\n',
83
            'log_file = /home/user/.kamaki.log\n',
84
            'network_cli = cyclades\n',
85
            'log_pid = off\n',
86
            '\n',
87
            '[cloud "demo"]\n',
88
            'url = https://demo.example.com\n',
89
            'token = t0k3n-0f-d3m0-3x4mp13\n',
90
            '\n',
91
            '[cloud "~mycloud"]\n',
92
            'url = https://example.com\n',
93
            'pithos_container = images\n']
94

    
95
    def tearDown(self):
96
        try:
97
            self.f.close()
98
        except Exception:
99
            pass
100
        finally:
101
            from kamaki.cli.config import DEFAULTS
102
            keys = DEFAULTS.keys()
103
            for k in keys:
104
                DEFAULTS.pop(k)
105
            for k, v in self.DEFAULTS.items():
106
                DEFAULTS[k] = v
107

    
108
    @patch('kamaki.cli.config.Config.remove_section')
109
    @patch('kamaki.cli.config.Config.items', return_value=(
110
        ('k1', 'v1'), ('k2', 'v2')))
111
    @patch('kamaki.cli.config.Config.sections', return_value=('a', 'b'))
112
    @patch('kamaki.cli.config.Config.set_cloud')
113
    @patch('kamaki.cli.config.Config.read')
114
    @patch('kamaki.cli.config.Config._load_defaults')
115
    def test___init__(
116
            self, _ld, c_read, c_set_cloud, c_sections, c_items,
117
            c_remove_section):
118
        from kamaki.cli.config import (
119
            Config, RawConfigParser, CONFIG_ENV, CONFIG_PATH)
120
        _ld_num, c_sections_num, c_items_num, c_set_cloud_num = 0, 0, 0, 0
121
        c_remove_section_num, gen_call = 0, [call('a'), call('b')]
122
        for path, with_defaults in product((None, '/a/path'), (True, False)):
123
            with patch(
124
                    'kamaki.cli.config.Config._cloud_name',
125
                    return_value=_2value_gen.next()) as _cloud_name:
126
                cnf = Config(path=path, with_defaults=with_defaults)
127
                self.assertTrue(isinstance(cnf, RawConfigParser))
128
                cpath = path or os.environ.get(CONFIG_ENV, CONFIG_PATH)
129
                self.assertEqual(cnf.path, cpath)
130
                if with_defaults:
131
                    _ld_num += 1
132
                    self.assertEqual(_ld.mock_calls[-1], call())
133
                self.assertEqual(len(_ld.mock_calls), _ld_num)
134
                self.assertEqual(c_read.mock_calls[-1], call(cpath))
135

    
136
                c_sections_num += 1
137
                self.assertEqual(len(c_sections.mock_calls), c_sections_num)
138
                self.assertEqual(c_sections.mock_calls[-1], call())
139

    
140
                self.assertEqual(_cloud_name.mock_calls, gen_call)
141

    
142
                r = _2value_gen.next()
143
                if r:
144
                    c_items_num += 2
145
                    self.assertEqual(c_items.mock_calls[-2:], gen_call)
146
                    c_set_cloud_num += 4
147
                    self.assertEqual(c_set_cloud.mock_calls[-4:], [
148
                        call(r, 'k1', 'v1'), call(r, 'k2', 'v2')] * 2)
149
                    c_remove_section_num += 2
150
                    self.assertEqual(
151
                        c_remove_section.mock_calls[-2:], gen_call)
152
                self.assertEqual(len(c_items.mock_calls), c_items_num)
153
                self.assertEqual(len(c_set_cloud.mock_calls), c_set_cloud_num)
154
                self.assertEqual(
155
                    len(c_remove_section.mock_calls), c_remove_section_num)
156

    
157
    def test__cloud_name(self):
158
        from kamaki.cli.config import (
159
            Config, CLOUD_PREFIX, InvalidCloudNameError)
160
        cn = Config._cloud_name
161
        self.assertEqual(cn('non%s name' % CLOUD_PREFIX), None)
162
        for invalid in ('"!@#$%^&())_"', '"a b c"', u'"\xce\xcd"', 'naked'):
163
            self.assertRaises(
164
                InvalidCloudNameError, cn, '%s %s' % (CLOUD_PREFIX, invalid))
165
        for valid in ('word', '~okeanos', 'd0t.ted', 'ha$h#ed'):
166
            self.assertEqual(cn('%s "%s"' % (CLOUD_PREFIX, valid)), valid)
167

    
168
    def test_rescue_old_file(self):
169
        from kamaki.cli.config import Config
170

    
171
        content0 = list(self.config_file_content)
172

    
173
        def make_file(lines):
174
            f = NamedTemporaryFile()
175
            f.writelines(lines)
176
            f.flush()
177
            return f
178

    
179
        with make_file(content0) as f:
180
            _cnf = Config(path=f.name)
181
            self.assertEqual([], _cnf.rescue_old_file())
182
        del _cnf
183

    
184
        content1, sample = list(content0), 'xyz_cli = XYZ_specs'
185
        content1.insert(2, '%s\n' % sample)
186

    
187
        with make_file(content1) as f:
188
            f.seek(0)
189
            _cnf = Config(path=f.name)
190
            self.assertEqual(
191
                sorted(['global.%s' % sample]), sorted(_cnf.rescue_old_file()))
192
        del _cnf
193

    
194
        content2, sample = list(content0), 'http://www.example2.org'
195
        content2.insert(2, 'url = %s\n' % sample)
196
        err = StringIO()
197

    
198
        with make_file(content2) as f:
199
            _cnf = Config(path=f.name)
200
            self.assertEqual([], _cnf.rescue_old_file(err=err))
201
            self.assertEqual(
202
                '... rescue global.url => cloud.default.url\n', err.getvalue())
203
            self.assertEqual(sample, _cnf.get_cloud('default', 'url'))
204
        del _cnf
205

    
206
        content3 = list(content0)
207
        content3.insert(
208
            2, 'url = http://example1.com\nurl = http://example2.com\n')
209

    
210
        with make_file(content3) as f:
211
            _cnf = Config(path=f.name)
212
            self.assertEqual([], _cnf.rescue_old_file(err=err))
213
            self.assertEqual(
214
                2 * '... rescue global.url => cloud.default.url\n',
215
                err.getvalue())
216
            self.assertEqual(
217
                'http://example2.com', _cnf.get_cloud('default', 'url'))
218
        del _cnf
219

    
220
        content4 = list(content0)
221
        content4.insert(2, 'url = http://example1.com\n')
222
        content4.append('\n[cloud "default"]\nurl=http://example2.com\n')
223

    
224
        with make_file(content4) as f:
225
            _cnf = Config(path=f.name)
226
            from kamaki.cli.errors import CLISyntaxError
227
            self.assertRaises(CLISyntaxError, _cnf.rescue_old_file)
228
        del _cnf
229

    
230
        content5 = list(content0)
231
        extras = [
232
            ('pithos_cli', 'pithos'), ('store_cli', 'pithos'),
233
            ('storage_cli', 'pithos'), ('compute_cli', 'cyclades'),
234
            ('cyclades_cli', 'cyclades')]
235
        for sample in extras:
236
            content5.insert(2, '%s = %s\n' % sample)
237

    
238
        with make_file(content5) as f:
239
            _cnf = Config(path=f.name)
240
            self.assertEqual(
241
                sorted(['global.%s = %s' % sample for sample in extras]),
242
                 sorted(_cnf.rescue_old_file()))
243

    
244
    def test_guess_version(self):
245
        from kamaki.cli.config import Config
246
        from kamaki.cli.logger import add_file_logger
247

    
248
        def make_log_file():
249
            f = NamedTemporaryFile()
250
            add_file_logger('kamaki.cli.config', filename=f.name)
251
            return f
252

    
253
        def make_file(lines):
254
            f = NamedTemporaryFile()
255
            f.writelines(lines)
256
            f.flush()
257
            return f
258

    
259
        with make_file([]) as f:
260
            with make_log_file() as logf:
261
                _cnf = Config(path=f.name)
262
                self.assertEqual(0.9, _cnf.guess_version())
263
                exp = 'All heuristics failed, cannot decide\n'
264
                logf.file.seek(- len(exp), 2)
265
                self.assertEqual(exp, logf.read())
266

    
267
        content0 = list(self.config_file_content)
268

    
269
        with make_file(content0) as f:
270
            with make_log_file() as logf:
271
                _cnf = Config(path=f.name)
272
                self.assertEqual(0.9, _cnf.guess_version())
273
                exp = '... found cloud "demo"\n'
274
                logf.seek(- len(exp), 2)
275
                self.assertEqual(exp, logf.read())
276

    
277
        for term in ('url', 'token'):
278
            content1 = list(content0)
279
            content1.insert(2, '%s = some_value' % term)
280

    
281
            with make_file(content1) as f:
282
                with make_log_file() as logf:
283
                    _cnf = Config(path=f.name)
284
                    self.assertEqual(0.8, _cnf.guess_version())
285
                    exp = '..... config file has an old global section\n'
286
                    logf.seek(- len(exp), 2)
287
                    self.assertEqual(exp, logf.read())
288

    
289
    def test_get_cloud(self):
290
        from kamaki.cli.config import Config, CLOUD_PREFIX
291

    
292
        _cnf = Config(path=self.f.name)
293
        d = dict(opt1='v1', opt2='v2')
294
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
295
            self.assertEqual('v1', _cnf.get_cloud('mycloud', 'opt1'))
296
            self.assertEqual(
297
                get.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud'))
298
            self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt3')
299
        with patch('kamaki.cli.config.Config.get', return_value=0) as get:
300
            self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt1')
301

    
302
    @patch('kamaki.cli.config.Config.set')
303
    def test_set_cloud(self, c_set):
304
        from kamaki.cli.config import Config, CLOUD_PREFIX
305
        _cnf = Config(path=self.f.name)
306

    
307
        d = dict(k='v')
308
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
309
            _cnf.set_cloud('mycloud', 'opt', 'val')
310
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
311
            d['opt'] = 'val'
312
            self.assertEqual(
313
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
314

    
315
        with patch('kamaki.cli.config.Config.get', return_value=None) as get:
316
            _cnf.set_cloud('mycloud', 'opt', 'val')
317
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
318
            d = dict(opt='val')
319
            self.assertEqual(
320
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
321

    
322
        with patch(
323
                'kamaki.cli.config.Config.get', side_effect=KeyError()) as get:
324
            _cnf.set_cloud('mycloud', 'opt', 'val')
325
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
326
            d = dict(opt='val')
327
            self.assertEqual(
328
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
329

    
330
    def test__load_defaults(self):
331
        from kamaki.cli.config import Config, DEFAULTS
332
        _cnf = Config(path=self.f.name)
333

    
334
        with patch('kamaki.cli.config.Config.set') as c_set:
335
            _cnf._load_defaults()
336
            for i, (section, options) in enumerate(DEFAULTS.items()):
337
                for j, (option, val) in enumerate(options.items()):
338
                    self.assertEqual(
339
                        c_set.mock_calls[(i + 1) * j],
340
                        call(section, option, val))
341

    
342
    def test__get_dict(self):
343
        from kamaki.cli.config import Config, CLOUD_PREFIX, DEFAULTS
344

    
345
        def make_file(lines):
346
            f = NamedTemporaryFile()
347
            f.writelines(lines)
348
            f.flush()
349
            return f
350

    
351
        with make_file([]) as f:
352
            _cnf = Config(path=f.name)
353
            for term in ('global', CLOUD_PREFIX):
354
                self.assertEqual(DEFAULTS[term], _cnf._get_dict(term))
355
            for term in ('nosection', ''):
356
                self.assertEqual({}, _cnf._get_dict(term))
357

    
358
        with make_file(self.config_file_content) as f:
359
            _cnf = Config(path=f.name)
360
            for term in ('global', CLOUD_PREFIX):
361
                self.assertNotEqual(DEFAULTS[term], _cnf._get_dict(term))
362

    
363
    def test_reload(self):
364
        from kamaki.cli.config import Config
365
        _cnf = Config(path=self.f.name)
366

    
367
        with patch('kamaki.cli.config.Config.__init__') as i:
368
            _cnf.reload()
369
            i.assert_called_once_with(self.f.name)
370

    
371
    @patch('kamaki.cli.config.Config.get_cloud', return_value='get cloud')
372
    def test_get(self, get_cloud):
373
        from kamaki.cli.config import Config
374
        _cnf = Config(path=self.f.name)
375
        self.assertEqual('pithos', _cnf.get('global', 'file_cli'))
376
        self.assertEqual(get_cloud.mock_calls, [])
377
        for opt, sec in (('cloud', 'non-existing'), ('non-opt', 'exists')):
378
            self.assertEqual(None, _cnf.get(opt, sec))
379
            self.assertEqual(get_cloud.mock_calls, [])
380
        self.assertEqual('get cloud', _cnf.get('cloud.demo', 'url'))
381
        self.assertEqual(get_cloud.mock_calls[-1], call('demo', 'url'))
382

    
383
    def test_set(self):
384
        from kamaki.cli.config import Config, CLOUD_PREFIX
385
        _cnf = Config(path=self.f.name)
386

    
387
        with patch(
388
                'kamaki.cli.config.Config._cloud_name',
389
                return_value='cn') as _cloud_name:
390
            with patch(
391
                    'kamaki.cli.config.Config.set_cloud',
392
                    return_value='sc') as set_cloud:
393
                self.assertEqual(
394
                    'sc', _cnf.set('%s.sec' % CLOUD_PREFIX, 'opt', 'val'))
395
                self.assertEqual(
396
                    _cloud_name.mock_calls[-1],
397
                    call('%s "sec"' % CLOUD_PREFIX))
398
                self.assertEqual(
399
                    set_cloud.mock_calls[-1], call('cn', 'opt', 'val'))
400

    
401
                self.assertTrue(len(_cnf.items('global')) > 0)
402
                self.assertEqual(None, _cnf.set('global', 'opt', 'val'))
403
                self.assertTrue(('opt', 'val') in _cnf.items('global'))
404

    
405
                self.assertTrue(len(_cnf.items('new')) == 0)
406
                self.assertEqual(None, _cnf.set('new', 'opt', 'val'))
407
                self.assertTrue(('opt', 'val') in _cnf.items('new'))
408

    
409
    def test_remove_option(self):
410
        from kamaki.cli.config import Config
411
        _cnf = Config(path=self.f.name)
412

    
413
        self.assertEqual(len(_cnf.items('no-section')), 0)
414
        _cnf.remove_option('no-section', 'opt', False)
415
        self.assertEqual(len(_cnf.items('no-section')), 0)
416
        _cnf.remove_option('no-section', 'opt', True)
417
        self.assertEqual(len(_cnf.items('no-section')), 0)
418

    
419
        opt_num = len(_cnf.items('global'))
420
        self.assertTrue(opt_num > 0)
421
        _cnf.remove_option('global', 'file_cli', False)
422
        self.assertEqual(len(_cnf.items('global')), opt_num)
423
        _cnf.remove_option('global', 'file_cli', True)
424
        self.assertEqual(len(_cnf.items('global')), opt_num - 1)
425

    
426
        _cnf.set('global', 'server_cli', 'alt-server')
427
        self.assertTrue(('server_cli', 'alt-server') in _cnf.items('global'))
428
        self.assertFalse(('server_cli', 'cyclades') in _cnf.items('global'))
429
        _cnf.remove_option('global', 'server_cli', False)
430
        self.assertFalse(('server_cli', 'alt-server') in _cnf.items('global'))
431
        self.assertTrue(('server_cli', 'cyclades') in _cnf.items('global'))
432
        _cnf.remove_option('global', 'server_cli', True)
433
        self.assertFalse(('server_cli', 'alt-server') in _cnf.items('global'))
434
        self.assertFalse(('server_cli', 'cyclades') in _cnf.items('global'))
435

    
436
    def test_remove_from_cloud(self):
437
        from kamaki.cli.config import Config, CLOUD_PREFIX
438
        _cnf = Config(path=self.f.name)
439

    
440
        d = dict(k1='v1', k2='v2')
441
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
442
            _cnf.remove_from_cloud('cld', 'k1')
443
            self.assertEqual(d, dict(k2='v2'))
444
            self.assertRaises(KeyError, _cnf.remove_from_cloud, 'cld', 'opt')
445
            self.assertEqual(get.mock_calls, 2 * [call(CLOUD_PREFIX, 'cld')])
446

    
447
    @patch(
448
        'kamaki.cli.config.Config._get_dict',
449
        return_value={'k1': 'v1', 'k2': 'v2'})
450
    def test_keys(self, _get_dict):
451
        from kamaki.cli.config import Config
452
        _cnf = Config(path=self.f.name)
453

    
454
        self.assertEqual(
455
            sorted(['k1', 'k2']), sorted(_cnf.keys('opt', 'boolean')))
456
        _get_dict.assert_called_once_with('opt', 'boolean')
457

    
458
    @patch(
459
        'kamaki.cli.config.Config._get_dict',
460
        return_value={'k1': 'v1', 'k2': 'v2'})
461
    def test_items(self, _get_dict):
462
        from kamaki.cli.config import Config
463
        _cnf = Config(path=self.f.name)
464

    
465
        self.assertEqual(
466
            sorted([('k1', 'v1'), ('k2', 'v2')]),
467
            sorted(_cnf.items('opt', 'boolean')))
468
        _get_dict.assert_called_once_with('opt', 'boolean')
469

    
470
    def test_override(self):
471
        from kamaki.cli.config import Config
472
        _cnf = Config(path=self.f.name)
473

    
474
        _cnf.override('sec', 'opt', 'val')
475
        self.assertEqual(_cnf._overrides['sec']['opt'], 'val')
476

    
477
    def test_write(self):
478
        from kamaki.cli.config import Config, DEFAULTS
479
        _cnf = Config(path=self.f.name)
480

    
481
        exp = '%s[global]\n' % HEADER
482
        exp += ''.join([
483
            '%s = %s\n' % (k, v) for k, v in DEFAULTS['global'].items()])
484
        exp += '\n'
485

    
486
        _cnf.write()
487
        self.f.seek(0)
488
        self.assertEqual(self.f.read(), exp)
489

    
490
        del _cnf
491
        with NamedTemporaryFile() as f:
492
            f.write('\n'.join(self.config_file_content))
493
            f.flush()
494
            _cnf = Config(path=f.name)
495
            f.seek(0)
496
            self.assertEqual(f.read(), '\n'.join(self.config_file_content))
497
            _cnf.write()
498
            f.seek(0)
499
            file_contents = f.read()
500
            for line in self.config_file_content:
501
                self.assertTrue(line in file_contents)
502
            _cnf.set('sec', 'opt', 'val')
503
            _cnf.set('global', 'opt', 'val')
504
            _cnf.set('global', 'file_cli', 'val')
505
            _cnf.write()
506
            f.seek(0)
507
            file_contents = f.read()
508
            for line in ('file_cli = val\n', '[sec]\n', 'opt = val\n'):
509
                self.assertTrue(line in file_contents)
510

    
511

    
512
if __name__ == '__main__':
513
    from sys import argv
514
    from kamaki.cli.test import runTestCase
515
    runTestCase(Config, 'Config', argv[1:])