Revision bcd6c6e6 kamaki/clients/connection/test.py
b/kamaki/clients/connection/test.py | ||
---|---|---|
155 | 155 |
def setUp(self): |
156 | 156 |
self.conn = kamakicon.KamakiHTTPConnection() |
157 | 157 |
|
158 |
def test__retrieve_connection_info(self): |
|
159 |
async_params = dict(param1='val1', param2=None, param3=42) |
|
160 |
r = self.conn._retrieve_connection_info(async_params) |
|
161 |
self.assertEquals(r, ('http', '127.0.0.1')) |
|
162 |
expected = '?%s' % '&'.join([( |
|
163 |
'%s=%s' % (k, v)) if v else ( |
|
164 |
'%s' % k) for k, v in async_params.items()]) |
|
165 |
self.assertEquals('http://127.0.0.1%s' % expected, self.conn.url) |
|
166 |
|
|
167 |
for schnet in ( |
|
168 |
('http', 'www.example.com'), ('https', 'www.example.com'), |
|
169 |
('ftp', 'www.example.com'), ('ftps', 'www.example.com'), |
|
170 |
('http', 'www.example.com/v1'), ('https', 'www.example.com/v1')): |
|
171 |
self.conn = kamakicon.KamakiHTTPConnection(url='%s://%s' % schnet) |
|
172 |
self.conn.url = '%s://%s' % schnet |
|
173 |
r = self.conn._retrieve_connection_info(async_params) |
|
174 |
if schnet[1].endswith('v1'): |
|
175 |
self.assertEquals(r, (schnet[0], schnet[1][:-3])) |
|
176 |
else: |
|
177 |
self.assertEquals(r, schnet) |
|
178 |
self.assertEquals( |
|
179 |
'%s://%s/%s' % (schnet[0], schnet[1], expected), |
|
180 |
self.conn.url) |
|
181 |
|
|
158 | 182 |
def test_perform_request(self): |
159 | 183 |
from httplib import HTTPConnection |
160 | 184 |
from objpool import http |
Also available in: Unified diff