Fix pep8 compliance issues everywhere
[kamaki] / kamaki / clients / tests / __init__.py
1 # Copyright 2012-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import inspect
35 from unittest import TestCase, TestSuite, TextTestRunner
36 from argparse import ArgumentParser
37 from sys import stdout
38 from progress.bar import ShadyBar
39
40 from kamaki.cli.config import Config
41 from kamaki.cli.utils import spiner
42
43
44 def _add_value(foo, value):
45     def wrap(self):
46         return foo(self, value)
47     return wrap
48
49
50 class Generic(TestCase):
51
52     _waits = []
53     _cnf = None
54     _grp = None
55     _fetched = {}
56
57     def __init__(self, specific=None, config=None, group=None):
58         super(Generic, self).__init__(specific)
59         self._cnf = config or Config()
60         self._grp = group
61         self._waits.append(0.71828)
62         for i in range(10):
63             self._waits.append(self._waits[-1] * 2.71828)
64
65     def __getitem__(self, key):
66         key = self._key(key)
67         try:
68             return self._fetched[key]
69         except KeyError:
70             return self._get_from_cnf(key)
71
72     def _key(self, key):
73         return ('', key) if isinstance(key, str)\
74             else ('', key[0]) if len(key) == 1\
75             else key
76
77     def _get_from_cnf(self, key):
78         val = 0
79         if key[0]:
80             val = self._cnf.get('test', '%s_%s' % key)\
81                 or self._cnf.get(*key)
82         if not val:
83             val = self._cnf.get('test', key[1])\
84                 or self._cnf.get('global', key[1])
85         self._fetched[key] = val
86         return val
87
88     def _safe_progress_bar(self, msg):
89         """Try to get a progress bar, but do not raise errors"""
90         try:
91             wait_bar = ShadyBar(msg)
92
93             def wait_gen(n):
94                 for i in wait_bar.iter(range(int(n))):
95                     yield
96                 yield
97             wait_cb = wait_gen
98         except Exception:
99             stdout.write('%s:' % msg)
100             (wait_bar, wait_cb) = (None, spiner)
101         return (wait_bar, wait_cb)
102
103     def _safe_progress_bar_finish(self, progress_bar):
104         try:
105             progress_bar.finish()
106         except Exception:
107             print(' DONE')
108
109     def do_with_progress_bar(self, action, msg, items):
110         if not items:
111             print('%s: DONE' % msg)
112             return
113         (action_bar, action_cb) = self._safe_progress_bar(msg)
114         action_gen = action_cb(len(items))
115         try:
116             action_gen.next()
117         except Exception:
118             pass
119         for item in items:
120             action(item)
121             action_gen.next()
122         self._safe_progress_bar_finish(action_bar)
123
124     def assert_dicts_are_deeply_equal(self, d1, d2):
125         (st1, st2) = (set(d1.keys()), set(d2.keys()))
126         diff1 = st1.difference(st2)
127         diff2 = st2.difference(st1)
128         self.assertTrue(
129             not (diff1 or diff2),
130             'Key differences:\n\td1-d2=%s\n\td2-d1=%s' % (diff1, diff2))
131         for k, v in d1.items():
132             if isinstance(v, dict):
133                 self.assert_dicts_are_deeply_equal(v, d2[k])
134             else:
135                 self.assertEqual(unicode(v), unicode(d2[k]))
136
137     def test_000(self):
138         print('')
139         methods = [method for method in inspect.getmembers(
140             self,
141             predicate=inspect.ismethod) if method[0].startswith('_test_')]
142         failures = 0
143         for method in methods:
144             stdout.write('Test %s ' % method[0][6:])
145             stdout.flush()
146             try:
147                 method[1]()
148                 print(' ...ok')
149             except AssertionError:
150                 print('  FAIL: %s (%s)' % (method[0], method[1]))
151                 failures += 1
152         if failures:
153             raise AssertionError('%s failures' % failures)
154
155
156 def init_parser():
157     parser = ArgumentParser(add_help=False)
158     parser.add_argument(
159         '-h', '--help',
160         dest='help',
161         action='store_true',
162         default=False,
163         help="Show this help message and exit")
164     return parser
165
166
167 def main(argv):
168     _main(argv, config=None)
169
170
171 def _main(argv, config=None):
172     suiteFew = TestSuite()
173     if len(argv) == 0 or argv[0] == 'pithos':
174         from kamaki.clients.tests.pithos import Pithos
175         test_method = 'test_%s' % (argv[1] if len(argv) > 1 else '000')
176         suiteFew.addTest(Pithos(test_method, config))
177     if len(argv) == 0 or argv[0] == 'cyclades':
178         from kamaki.clients.tests.cyclades import Cyclades
179         test_method = 'test_%s' % (argv[1] if len(argv) > 1 else '000')
180         suiteFew.addTest(Cyclades(test_method, config))
181     if len(argv) == 0 or argv[0] == 'image':
182         from kamaki.clients.tests.image import Image
183         test_method = 'test_%s' % (argv[1] if len(argv) > 1 else '000')
184         suiteFew.addTest(Image(test_method, config))
185     if len(argv) == 0 or argv[0] == 'astakos':
186         from kamaki.clients.tests.astakos import Astakos
187         test_method = 'test_%s' % (argv[1] if len(argv) > 1 else '000')
188         suiteFew.addTest(Astakos(test_method, config))
189
190     TextTestRunner(verbosity=2).run(suiteFew)
191
192 if __name__ == '__main__':
193     parser = init_parser()
194     args, argv = parser.parse_known_args()
195     if len(argv) > 2 or getattr(args, 'help') or len(argv) < 1:
196         raise Exception('\tusage: tests <group> [command]')
197     main(argv)