Revision c2b5da2f kamaki/clients/__init__.py
b/kamaki/clients/__init__.py | ||
---|---|---|
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 | 34 |
from urllib2 import quote |
35 |
from urlparse import urlparse |
|
35 | 36 |
from threading import Thread |
36 | 37 |
from json import dumps, loads |
37 | 38 |
from time import time |
39 |
from httplib import ResponseNotReady |
|
40 |
from time import sleep |
|
41 |
from random import random |
|
42 |
|
|
43 |
from objpool.http import PooledHTTPConnection |
|
38 | 44 |
|
39 | 45 |
from kamaki.clients.utils import logger |
40 |
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection |
|
41 |
from kamaki.clients.connection.errors import KamakiConnectionError |
|
42 |
from kamaki.clients.connection.errors import KamakiResponseError |
|
43 | 46 |
|
44 | 47 |
LOG_TOKEN = False |
45 | 48 |
DEBUG_LOG = logger.get_log_filename() |
... | ... | |
60 | 63 |
logger.add_file_logger('ClientError', __name__, filename=DEBUG_LOG) |
61 | 64 |
clienterrorlog = logger.get_logger('ClientError') |
62 | 65 |
|
66 |
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE'] |
|
67 |
|
|
68 |
|
|
69 |
def _encode(v): |
|
70 |
if v and isinstance(v, unicode): |
|
71 |
return quote(v.encode('utf-8')) |
|
72 |
return v |
|
73 |
|
|
63 | 74 |
|
64 | 75 |
class ClientError(Exception): |
65 | 76 |
def __init__(self, message, status=0, details=None): |
... | ... | |
97 | 108 |
self.details = details if details else [] |
98 | 109 |
|
99 | 110 |
|
111 |
class RequestManager(object): |
|
112 |
"""Handle http request information""" |
|
113 |
|
|
114 |
def _connection_info(self, url, path, params={}): |
|
115 |
""" Set self.url to scheme://netloc/?params |
|
116 |
:param url: (str or unicode) The service url |
|
117 |
|
|
118 |
:param path: (str or unicode) The service path (url/path) |
|
119 |
|
|
120 |
:param params: (dict) Parameters to add to final url |
|
121 |
|
|
122 |
:returns: (scheme, netloc) |
|
123 |
""" |
|
124 |
url = _encode(url) if url else 'http://127.0.0.1/' |
|
125 |
url += '' if url.endswith('/') else '/' |
|
126 |
if path: |
|
127 |
url += _encode(path[1:] if path.startswith('/') else path) |
|
128 |
for i, (key, val) in enumerate(params.items()): |
|
129 |
val = _encode(val) |
|
130 |
url += '%s%s' % ('&' if i else '?', key) |
|
131 |
if val: |
|
132 |
url += '=%s' % val |
|
133 |
parsed = urlparse(url) |
|
134 |
self.url = url |
|
135 |
self.path = parsed.path or '/' |
|
136 |
if parsed.query: |
|
137 |
self.path += '?%s' % parsed.query |
|
138 |
return (parsed.scheme, parsed.netloc) |
|
139 |
|
|
140 |
def __init__( |
|
141 |
self, method, url, path, |
|
142 |
data=None, headers={}, params={}): |
|
143 |
method = method.upper() |
|
144 |
assert method in HTTP_METHODS, 'Invalid http method %s' % method |
|
145 |
if headers: |
|
146 |
assert isinstance(headers, dict) |
|
147 |
self.headers = dict(headers) |
|
148 |
self.method, self.data = method, data |
|
149 |
self.scheme, self.netloc = self._connection_info(url, path, params) |
|
150 |
|
|
151 |
def perform(self, conn): |
|
152 |
""" |
|
153 |
:param conn: (httplib connection object) |
|
154 |
|
|
155 |
:returns: (HTTPResponse) |
|
156 |
""" |
|
157 |
# sendlog.debug( |
|
158 |
# 'RequestManager.perform mthd(%s), url(%s), headrs(%s), bdlen(%s)', |
|
159 |
# self.method, self.url, self.headers, self.data) |
|
160 |
conn.request( |
|
161 |
method=str(self.method.upper()), |
|
162 |
url=str(self.path), |
|
163 |
headers=self.headers, |
|
164 |
body=self.data) |
|
165 |
while True: |
|
166 |
try: |
|
167 |
return conn.getresponse() |
|
168 |
except ResponseNotReady: |
|
169 |
sleep(0.03 * random()) |
|
170 |
|
|
171 |
|
|
172 |
class ResponseManager(object): |
|
173 |
"""Manage the http request and handle the response data, headers, etc.""" |
|
174 |
|
|
175 |
def __init__(self, request, poolsize=None): |
|
176 |
""" |
|
177 |
:param request: (RequestManager) |
|
178 |
""" |
|
179 |
self.request = request |
|
180 |
self._request_performed = False |
|
181 |
self.poolsize = poolsize |
|
182 |
|
|
183 |
def _get_response(self): |
|
184 |
if self._request_performed: |
|
185 |
return |
|
186 |
|
|
187 |
pool_kw = dict(size=self.poolsize) if self.poolsize else dict() |
|
188 |
try: |
|
189 |
with PooledHTTPConnection( |
|
190 |
self.request.netloc, self.request.scheme, |
|
191 |
**pool_kw) as connection: |
|
192 |
r = self.request.perform(connection) |
|
193 |
# recvlog.debug('ResponseManager(%s):' % r) |
|
194 |
self._request_performed = True |
|
195 |
self._headers = dict() |
|
196 |
for k, v in r.getheaders(): |
|
197 |
self.headers[k] = v |
|
198 |
# recvlog.debug('\t%s: %s\t(%s)' % (k, v, r)) |
|
199 |
self._content = r.read() |
|
200 |
self._status_code = r.status |
|
201 |
self._status = r.reason |
|
202 |
except Exception as err: |
|
203 |
from kamaki.clients import recvlog |
|
204 |
from traceback import format_stack |
|
205 |
recvlog.debug('\n'.join(['%s' % type(err)] + format_stack())) |
|
206 |
raise ClientError( |
|
207 |
'Failed while http-connecting to %s (%s)' % ( |
|
208 |
self.request.url, |
|
209 |
err), |
|
210 |
1000) |
|
211 |
|
|
212 |
@property |
|
213 |
def status_code(self): |
|
214 |
self._get_response() |
|
215 |
return self._status_code |
|
216 |
|
|
217 |
@property |
|
218 |
def status(self): |
|
219 |
self._get_response() |
|
220 |
return self._status |
|
221 |
|
|
222 |
@property |
|
223 |
def headers(self): |
|
224 |
self._get_response() |
|
225 |
return self._headers |
|
226 |
|
|
227 |
@property |
|
228 |
def content(self): |
|
229 |
self._get_response() |
|
230 |
return self._content |
|
231 |
|
|
232 |
@property |
|
233 |
def text(self): |
|
234 |
""" |
|
235 |
:returns: (str) content |
|
236 |
""" |
|
237 |
self._get_response() |
|
238 |
return '%s' % self._content |
|
239 |
|
|
240 |
@property |
|
241 |
def json(self): |
|
242 |
""" |
|
243 |
:returns: (dict) squeezed from json-formated content |
|
244 |
""" |
|
245 |
self._get_response() |
|
246 |
try: |
|
247 |
return loads(self._content) |
|
248 |
except ValueError as err: |
|
249 |
ClientError('Response not formated in JSON - %s' % err) |
|
250 |
|
|
251 |
|
|
100 | 252 |
class SilentEvent(Thread): |
101 | 253 |
""" Thread-run method(*args, **kwargs)""" |
102 | 254 |
def __init__(self, method, *args, **kwargs): |
... | ... | |
127 | 279 |
|
128 | 280 |
class Client(object): |
129 | 281 |
|
130 |
def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
|
|
282 |
def __init__(self, base_url, token): |
|
131 | 283 |
self.base_url = base_url |
132 | 284 |
self.token = token |
133 |
self.headers = {}
|
|
285 |
self.headers, self.params = dict(), dict()
|
|
134 | 286 |
self.DATE_FORMATS = [ |
135 | 287 |
'%a %b %d %H:%M:%S %Y', |
136 | 288 |
'%A, %d-%b-%y %H:%M:%S GMT', |
137 | 289 |
'%a, %d %b %Y %H:%M:%S GMT'] |
138 |
self.http_client = http_client |
|
139 | 290 |
self.MAX_THREADS = 7 |
140 | 291 |
|
141 | 292 |
def _init_thread_limit(self, limit=1): |
... | ... | |
179 | 330 |
def set_header(self, name, value, iff=True): |
180 | 331 |
"""Set a header 'name':'value'""" |
181 | 332 |
if value is not None and iff: |
182 |
self.http_client.set_header(name, value)
|
|
333 |
self.headers[name] = value
|
|
183 | 334 |
|
184 | 335 |
def set_param(self, name, value=None, iff=True): |
185 | 336 |
if iff: |
186 |
self.http_client.set_param(name, value)
|
|
337 |
self.params[name] = value
|
|
187 | 338 |
|
188 | 339 |
def request( |
189 |
self, |
|
190 |
method, |
|
191 |
path, |
|
192 |
async_headers={}, |
|
193 |
async_params={}, |
|
340 |
self, method, path, |
|
341 |
async_headers=dict(), async_params=dict(), |
|
194 | 342 |
**kwargs): |
195 | 343 |
"""In threaded/asynchronous requests, headers and params are not safe |
196 | 344 |
Therefore, the standard self.set_header/param system can be used only |
... | ... | |
205 | 353 |
assert method |
206 | 354 |
assert isinstance(path, str) or isinstance(path, unicode) |
207 | 355 |
try: |
356 |
headers = dict(self.headers) |
|
357 |
headers.update(async_headers) |
|
358 |
params = dict(self.params) |
|
359 |
params.update(async_params) |
|
208 | 360 |
success = kwargs.pop('success', 200) |
209 | 361 |
data = kwargs.pop('data', None) |
210 |
self.http_client.headers.setdefault('X-Auth-Token', self.token) |
|
211 |
|
|
362 |
headers.setdefault('X-Auth-Token', self.token) |
|
212 | 363 |
if 'json' in kwargs: |
213 | 364 |
data = dumps(kwargs.pop('json')) |
214 |
self.http_client.headers.setdefault( |
|
215 |
'Content-Type', |
|
216 |
'application/json') |
|
365 |
headers.setdefault('Content-Type', 'application/json') |
|
217 | 366 |
if data: |
218 |
self.http_client.headers.setdefault( |
|
219 |
'Content-Length', |
|
220 |
'%s' % len(data)) |
|
221 |
|
|
222 |
sendlog.info('perform a %s @ %s', method, self.base_url) |
|
223 |
|
|
224 |
self.http_client.url = self.base_url |
|
225 |
self.http_client.path = quote(path.encode('utf8')) |
|
226 |
r = self.http_client.perform_request( |
|
227 |
method, |
|
228 |
data, |
|
229 |
async_headers, |
|
230 |
async_params) |
|
231 |
|
|
232 |
req = self.http_client |
|
233 |
sendlog.info('%s %s', method, req.url) |
|
234 |
headers = dict(req.headers) |
|
235 |
headers.update(async_headers) |
|
236 |
|
|
237 |
for key, val in headers.items(): |
|
367 |
headers.setdefault('Content-Length', '%s' % len(data)) |
|
368 |
|
|
369 |
req = RequestManager( |
|
370 |
method, self.base_url, path, |
|
371 |
data=data, headers=headers, params=params) |
|
372 |
sendlog.info('commit a %s @ %s\t[%s]', method, self.base_url, self) |
|
373 |
sendlog.info('\tpath: %s\t[%s]', req.path, self) |
|
374 |
for key, val in req.headers.items(): |
|
238 | 375 |
if (not LOG_TOKEN) and key.lower() == 'x-auth-token': |
239 | 376 |
continue |
240 |
sendlog.info('\t%s: %s', key, val) |
|
241 |
sendlog.info('') |
|
377 |
sendlog.info('\t%s: %s [%s]', key, val, self) |
|
242 | 378 |
if data: |
243 | 379 |
datasendlog.info(data) |
380 |
sendlog.info('END HTTP request commit\t[%s]', self) |
|
244 | 381 |
|
382 |
r = ResponseManager(req) |
|
245 | 383 |
recvlog.info('%d %s', r.status_code, r.status) |
246 | 384 |
for key, val in r.headers.items(): |
247 | 385 |
if (not LOG_TOKEN) and key.lower() == 'x-auth-token': |
... | ... | |
249 | 387 |
recvlog.info('%s: %s', key, val) |
250 | 388 |
if r.content: |
251 | 389 |
datarecvlog.info(r.content) |
252 |
|
|
253 |
except (KamakiResponseError, KamakiConnectionError) as err: |
|
254 |
from traceback import format_stack |
|
255 |
recvlog.debug('\n'.join(['%s' % type(err)] + format_stack())) |
|
256 |
self.http_client.reset_headers() |
|
257 |
self.http_client.reset_params() |
|
258 |
errstr = '%s' % err |
|
259 |
if not errstr: |
|
260 |
errstr = ('%s' % type(err))[7:-2] |
|
261 |
status = getattr(err, 'status', getattr(err, 'errno', 0)) |
|
262 |
raise ClientError('%s\n' % errstr, status=status) |
|
263 | 390 |
finally: |
264 |
self.http_client.reset_headers()
|
|
265 |
self.http_client.reset_params()
|
|
391 |
self.headers = dict()
|
|
392 |
self.params = dict()
|
|
266 | 393 |
|
267 | 394 |
if success is not None: |
268 | 395 |
# Success can either be an int or a collection |
269 | 396 |
success = (success,) if isinstance(success, int) else success |
270 | 397 |
if r.status_code not in success: |
271 |
r.release() |
|
272 | 398 |
self._raise_for_status(r) |
273 | 399 |
return r |
274 | 400 |
|
Also available in: Unified diff