Statistics
| Branch: | Tag: | Revision:

root / kamaki / cli / config / test.py @ 2e616dbd

History | View | Annotate | Download (14.2 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from mock import patch, call
35
from unittest import TestCase
36
from itertools import product
37
import os
38
from tempfile import NamedTemporaryFile
39
from io import StringIO
40

    
41

    
42
def _2steps_gen(limit=2):
43
    counter, ret = 0, None
44
    while True:
45
        if counter >= limit:
46
            ret = None if ret else 'something'
47
            counter = 0
48
        counter += 1
49
        yield ret
50

    
51
_2value_gen = _2steps_gen()
52

    
53

    
54
class Config(TestCase):
55
    """Test Config methods"""
56

    
57
    config_file_content = [
58
        '#kamaki config file version 0.9\n',
59
        '[global]\n',
60
        'max_threads = 5\n',
61
        'default_cloud = ~mycloud\n',
62
        'file_cli = pithos\n',
63
        'history_file = /home/user/.kamaki.history\n',
64
        'colors = off\n',
65
        'config_cli = config\n',
66
        'history_cli = history\n',
67
        'log_token = off\n',
68
        'server_cli = cyclades\n',
69
        'user_cli = astakos\n',
70
        'log_data = off\n',
71
        'flavor_cli = cyclades\n',
72
        'image_cli = image\n',
73
        'log_file = /home/user/.kamaki.log\n',
74
        'network_cli = cyclades\n',
75
        'log_pid = off\n',
76
        '\n',
77
        '[cloud "demo"]\n',
78
        'url = https://demo.example.com\n',
79
        'token = t0k3n-0f-d3m0-3x4mp13\n',
80
        '\n',
81
        '[cloud "~mycloud"]\n',
82
        'url = https://example.com\n',
83
        'pithos_container = images\n']
84

    
85
    def setUp(self):
86
        self.f = NamedTemporaryFile()
87

    
88
    def readDown(self):
89
        try:
90
            self.f.close()
91
        except Exception:
92
            pass
93

    
94
    @patch('kamaki.cli.config.Config.remove_section')
95
    @patch('kamaki.cli.config.Config.items', return_value=(
96
        ('k1', 'v1'), ('k2', 'v2')))
97
    @patch('kamaki.cli.config.Config.sections', return_value=('a', 'b'))
98
    @patch('kamaki.cli.config.Config.set_cloud')
99
    @patch('kamaki.cli.config.Config.read')
100
    @patch('kamaki.cli.config.Config._load_defaults')
101
    def test___init__(
102
            self, _ld, c_read, c_set_cloud, c_sections, c_items,
103
            c_remove_section):
104
        from kamaki.cli.config import (
105
            Config, RawConfigParser, CONFIG_ENV, CONFIG_PATH)
106
        _ld_num, c_sections_num, c_items_num, c_set_cloud_num = 0, 0, 0, 0
107
        c_remove_section_num, gen_call = 0, [call('a'), call('b')]
108
        for path, with_defaults in product((None, '/a/path'), (True, False)):
109
            with patch(
110
                    'kamaki.cli.config.Config._cloud_name',
111
                    return_value=_2value_gen.next()) as _cloud_name:
112
                cnf = Config(path=path, with_defaults=with_defaults)
113
                self.assertTrue(isinstance(cnf, RawConfigParser))
114
                cpath = path or os.environ.get(CONFIG_ENV, CONFIG_PATH)
115
                self.assertEqual(cnf.path, cpath)
116
                if with_defaults:
117
                    _ld_num += 1
118
                    self.assertEqual(_ld.mock_calls[-1], call())
119
                self.assertEqual(len(_ld.mock_calls), _ld_num)
120
                self.assertEqual(c_read.mock_calls[-1], call(cpath))
121

    
122
                c_sections_num += 1
123
                self.assertEqual(len(c_sections.mock_calls), c_sections_num)
124
                self.assertEqual(c_sections.mock_calls[-1], call())
125

    
126
                self.assertEqual(_cloud_name.mock_calls, gen_call)
127

    
128
                r = _2value_gen.next()
129
                if r:
130
                    c_items_num += 2
131
                    self.assertEqual(c_items.mock_calls[-2:], gen_call)
132
                    c_set_cloud_num += 4
133
                    self.assertEqual(c_set_cloud.mock_calls[-4:], [
134
                        call(r, 'k1', 'v1'), call(r, 'k2', 'v2')] * 2)
135
                    c_remove_section_num += 2
136
                    self.assertEqual(
137
                        c_remove_section.mock_calls[-2:], gen_call)
138
                self.assertEqual(len(c_items.mock_calls), c_items_num)
139
                self.assertEqual(len(c_set_cloud.mock_calls), c_set_cloud_num)
140
                self.assertEqual(
141
                    len(c_remove_section.mock_calls), c_remove_section_num)
142

    
143
    def test__cloud_name(self):
144
        from kamaki.cli.config import (
145
            Config, CLOUD_PREFIX, InvalidCloudNameError)
146
        cn = Config._cloud_name
147
        self.assertEqual(cn('non%s name' % CLOUD_PREFIX), None)
148
        for invalid in ('"!@#$%^&())_"', '"a b c"', u'"\xce\xcd"', 'naked'):
149
            self.assertRaises(
150
                InvalidCloudNameError, cn, '%s %s' % (CLOUD_PREFIX, invalid))
151
        for valid in ('word', '~okeanos', 'd0t.ted', 'ha$h#ed'):
152
            self.assertEqual(cn('%s "%s"' % (CLOUD_PREFIX, valid)), valid)
153

    
154
    def test_rescue_old_file(self):
155
        from kamaki.cli.config import Config
156

    
157
        content0 = list(self.config_file_content)
158

    
159
        def make_file(lines):
160
            f = NamedTemporaryFile()
161
            f.writelines(lines)
162
            f.flush()
163
            return f
164

    
165
        with make_file(content0) as f:
166
            _cnf = Config(path=f.name)
167
            self.assertEqual([], _cnf.rescue_old_file())
168
        del _cnf
169

    
170
        content1, sample = list(content0), 'xyz_cli = XYZ_specs'
171
        content1.insert(2, '%s\n' % sample)
172

    
173
        with make_file(content1) as f:
174
            _cnf = Config(path=f.name)
175
            self.assertEqual(['global.%s' % sample], _cnf.rescue_old_file())
176
        del _cnf
177

    
178
        content2, sample = list(content0), 'http://www.example2.org'
179
        content2.insert(2, 'url = %s\n' % sample)
180
        err = StringIO()
181

    
182
        with make_file(content2) as f:
183
            _cnf = Config(path=f.name)
184
            self.assertEqual([], _cnf.rescue_old_file(err=err))
185
            self.assertEqual(
186
                '... rescue global.url => cloud.default.url\n', err.getvalue())
187
            self.assertEqual(sample, _cnf.get_cloud('default', 'url'))
188
        del _cnf
189

    
190
        content3 = list(content0)
191
        content3.insert(
192
            2, 'url = http://example1.com\nurl = http://example2.com\n')
193

    
194
        with make_file(content3) as f:
195
            _cnf = Config(path=f.name)
196
            self.assertEqual([], _cnf.rescue_old_file(err=err))
197
            self.assertEqual(
198
                2 * '... rescue global.url => cloud.default.url\n',
199
                err.getvalue())
200
            self.assertEqual(
201
                'http://example2.com', _cnf.get_cloud('default', 'url'))
202
        del _cnf
203

    
204
        content4 = list(content0)
205
        content4.insert(2, 'url = http://example1.com\n')
206
        content4.append('\n[cloud "default"]\nurl=http://example2.com\n')
207

    
208
        with make_file(content4) as f:
209
            _cnf = Config(path=f.name)
210
            from kamaki.cli.errors import CLISyntaxError
211
            self.assertRaises(CLISyntaxError, _cnf.rescue_old_file)
212
        del _cnf
213

    
214
        content5 = list(content0)
215
        extras = [
216
            ('pithos_cli', 'pithos'), ('store_cli', 'pithos'),
217
            ('storage_cli', 'pithos'), ('compute_cli', 'cyclades'),
218
            ('cyclades_cli', 'cyclades')]
219
        for sample in extras:
220
            content5.insert(2, '%s = %s\n' % sample)
221

    
222
        with make_file(content5) as f:
223
            _cnf = Config(path=f.name)
224
            self.assertEqual(
225
                sorted(['global.%s = %s' % sample for sample in extras]),
226
                 sorted(_cnf.rescue_old_file()))
227

    
228
    def test_guess_version(self):
229
        from kamaki.cli.config import Config
230
        from kamaki.cli.logger import add_file_logger
231

    
232
        def make_log_file():
233
            f = NamedTemporaryFile()
234
            add_file_logger('kamaki.cli.config', filename=f.name)
235
            return f
236

    
237
        def make_file(lines):
238
            f = NamedTemporaryFile()
239
            f.writelines(lines)
240
            f.flush()
241
            return f
242

    
243
        with make_file([]) as f:
244
            with make_log_file() as logf:
245
                _cnf = Config(path=f.name)
246
                self.assertEqual(0.9, _cnf.guess_version())
247
                exp = 'All heuristics failed, cannot decide\n'
248
                logf.file.seek(- len(exp), 2)
249
                self.assertEqual(exp, logf.read())
250

    
251
        content0 = list(self.config_file_content)
252

    
253
        with make_file(content0) as f:
254
            with make_log_file() as logf:
255
                _cnf = Config(path=f.name)
256
                self.assertEqual(0.9, _cnf.guess_version())
257
                exp = '... found cloud "demo"\n'
258
                logf.seek(- len(exp), 2)
259
                self.assertEqual(exp, logf.read())
260

    
261
        for term in ('url', 'token'):
262
            content1 = list(content0)
263
            content1.insert(2, '%s = some_value' % term)
264

    
265
            with make_file(content1) as f:
266
                with make_log_file() as logf:
267
                    _cnf = Config(path=f.name)
268
                    self.assertEqual(0.8, _cnf.guess_version())
269
                    exp = '..... config file has an old global section\n'
270
                    logf.seek(- len(exp), 2)
271
                    self.assertEqual(exp, logf.read())
272

    
273
    def test_get_cloud(self):
274
        from kamaki.cli.config import Config, CLOUD_PREFIX
275

    
276
        _cnf = Config(path=self.f.name)
277
        d = dict(opt1='v1', opt2='v2')
278
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
279
            self.assertEqual('v1', _cnf.get_cloud('mycloud', 'opt1'))
280
            self.assertEqual(
281
                get.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud'))
282
            self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt3')
283
        with patch('kamaki.cli.config.Config.get', return_value=0) as get:
284
            self.assertRaises(KeyError, _cnf.get_cloud, 'mycloud', 'opt1')
285

    
286
    def test_get_global(self):
287
        from kamaki.cli.config import Config
288

    
289
        _cnf = Config(path=self.f.name)
290
        with patch('kamaki.cli.config.Config.get', return_value='val') as get:
291
            self.assertEqual('val', _cnf.get_global('opt'))
292
            get.assert_called_once_with('global', 'opt')
293

    
294
    @patch('kamaki.cli.config.Config.set')
295
    def test_set_cloud(self, c_set):
296
        from kamaki.cli.config import Config, CLOUD_PREFIX
297
        _cnf = Config(path=self.f.name)
298

    
299
        d = dict(k='v')
300
        with patch('kamaki.cli.config.Config.get', return_value=d) as get:
301
            _cnf.set_cloud('mycloud', 'opt', 'val')
302
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
303
            d['opt'] = 'val'
304
            self.assertEqual(
305
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
306

    
307
        with patch('kamaki.cli.config.Config.get', return_value=None) as get:
308
            _cnf.set_cloud('mycloud', 'opt', 'val')
309
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
310
            d = dict(opt='val')
311
            self.assertEqual(
312
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
313

    
314
        with patch(
315
                'kamaki.cli.config.Config.get', side_effect=KeyError()) as get:
316
            _cnf.set_cloud('mycloud', 'opt', 'val')
317
            get.assert_called_once_with(CLOUD_PREFIX, 'mycloud')
318
            d = dict(opt='val')
319
            self.assertEqual(
320
                c_set.mock_calls[-1], call(CLOUD_PREFIX, 'mycloud', d))
321

    
322
    def test_set_global(self):
323
        from kamaki.cli.config import Config
324
        _cnf = Config(path=self.f.name)
325

    
326
        with patch('kamaki.cli.config.Config.set') as c_set:
327
            _cnf.set_global('opt', 'val')
328
            c_set.assert_called_once_with('global', 'opt', 'val')
329

    
330
    def test__load_defaults(self):
331
        from kamaki.cli.config import Config, DEFAULTS
332
        _cnf = Config(path=self.f.name)
333

    
334
        with patch('kamaki.cli.config.Config.set') as c_set:
335
            _cnf._load_defaults()
336
            for i, (section, options) in enumerate(DEFAULTS.items()):
337
                for j, (option, val) in enumerate(options.items()):
338
                    self.assertEqual(
339
                        c_set.mock_calls[(i + 1) * j],
340
                        call(section, option, val))
341

    
342
    def test__get_dict(self):
343
        from kamaki.cli.config import Config, CLOUD_PREFIX, DEFAULTS
344

    
345
        def make_file(lines):
346
            f = NamedTemporaryFile()
347
            f.writelines(lines)
348
            f.flush()
349
            return f
350

    
351
        with make_file([]) as f:
352
            _cnf = Config(path=f.name)
353
            for term in ('global', CLOUD_PREFIX):
354
                self.assertEqual(DEFAULTS[term], _cnf._get_dict(term))
355
            for term in ('nosection', ''):
356
                self.assertEqual({}, _cnf._get_dict(term))
357

    
358
        with make_file(self.config_file_content) as f:
359
            _cnf = Config(path=f.name)
360
            for term in ('global', CLOUD_PREFIX):
361
                self.assertNotEqual(DEFAULTS[term], _cnf._get_dict(term))
362

    
363
    def test_reload(self):
364
        from kamaki.cli.config import Config
365
        _cnf = Config(path=self.f.name)
366

    
367
        with patch('kamaki.cli.config.Config.__init__') as i:
368
            _cnf.reload()
369
            i.assert_called_once_with(self.f.name)
370

    
371

    
372
if __name__ == '__main__':
373
    from sys import argv
374
    from kamaki.cli.test import runTestCase
375
    runTestCase(Config, 'Config', argv[1:])