Revision a8685edf kamaki/clients/livetest/__init__.py
b/kamaki/clients/livetest/__init__.py | ||
---|---|---|
169 | 169 |
return parser |
170 | 170 |
|
171 | 171 |
|
172 |
class Connection(Generic): |
|
173 |
|
|
174 |
def setUp(self): |
|
175 |
from kamaki.clients.connection import kamakicon |
|
176 |
self.conn = kamakicon.KamakiHTTPConnection |
|
177 |
self.resp = kamakicon.KamakiHTTPResponse |
|
178 |
self.rerr = kamakicon.KamakiResponseError |
|
179 |
self.url = self['store', 'url'] |
|
180 |
self.token = self['store', 'token'] |
|
181 |
|
|
182 |
def test_000(self): |
|
183 |
exceptions_left = 9 |
|
184 |
from random import randrange |
|
185 |
from httplib import HTTPSConnection |
|
186 |
from mock import patch |
|
187 |
connection = self.conn(url=self.url) |
|
188 |
tries = 0 |
|
189 |
while exceptions_left: |
|
190 |
from time import sleep |
|
191 |
sleep(0.5) |
|
192 |
tries += 1 |
|
193 |
print('Try connection #%s' % tries) |
|
194 |
response = connection.perform_request('GET', async_headers={ |
|
195 |
'X-Auth-Token': self.token}) |
|
196 |
with patch.object(response.request, 'close', return_value=None): |
|
197 |
if randrange(2): |
|
198 |
response.release() |
|
199 |
#self.assertTrue(len(text) > 0) |
|
200 |
else: |
|
201 |
print('\tRaise artificial exception (%s left)' % ( |
|
202 |
exceptions_left - 1)) |
|
203 |
with patch.object( |
|
204 |
HTTPSConnection, |
|
205 |
'getresponse', |
|
206 |
side_effect=self.rerr('A random error')): |
|
207 |
try: |
|
208 |
response.text |
|
209 |
except self.rerr: |
|
210 |
exceptions_left -= 1 |
|
211 |
else: |
|
212 |
assert False, 'Exception not raised as expected' |
|
213 |
response.request.close.assert_called_once_with() |
|
214 |
response.request.close() |
|
215 |
|
|
216 |
|
|
172 | 217 |
def main(argv, config=None): |
173 | 218 |
suiteFew = TestSuite() |
219 |
if len(argv) == 0 or argv[0] == 'connection': |
|
220 |
test_method = 'test_%s' % (argv[1] if len(argv) > 1 else '000') |
|
221 |
suiteFew.addTest(Connection(test_method, config)) |
|
174 | 222 |
if len(argv) == 0 or argv[0] == 'pithos': |
175 | 223 |
from kamaki.clients.livetest.pithos import Pithos |
176 | 224 |
test_method = 'test_%s' % (argv[1] if len(argv) > 1 else '000') |
Also available in: Unified diff