1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote
35 from threading import Thread
36 from json import dumps, loads
39 from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40 from kamaki.clients.connection.errors import HTTPConnectionError
41 from kamaki.clients.connection.errors import HTTPResponseError
43 sendlog = logging.getLogger('clients.send')
44 datasendlog = logging.getLogger('data.send')
45 recvlog = logging.getLogger('clients.recv')
46 datarecvlog = logging.getLogger('data.recv')
49 class ClientError(Exception):
50 def __init__(self, message, status=0, details=None):
52 message += '' if message and message[-1] == '\n' else '\n'
53 serv_stat, sep, new_msg = message.partition('{')
54 new_msg = sep + new_msg
55 json_msg = loads(new_msg)
56 key = json_msg.keys()[0]
58 json_msg = json_msg[key]
59 message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
60 if 'message' in json_msg else '%s %s' % (serv_stat, key)
61 if 'code' in json_msg:
62 status = json_msg['code']
63 if 'details' in json_msg:
66 elif not isinstance(details, list):
68 if json_msg['details']:
69 details.append(json_msg['details'])
73 super(ClientError, self).__init__(message)
75 self.details = details if details else []
78 class SilentEvent(Thread):
79 """ Thread-run method(*args, **kwargs)
80 put exception in exception_bucket
82 def __init__(self, method, *args, **kwargs):
83 super(self.__class__, self).__init__()
90 return getattr(self, '_exception', False)
94 return getattr(self, '_value', None)
98 self._value = self.method(*(self.args), **(self.kwargs))
99 except Exception as e:
100 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
103 e.status if isinstance(e, ClientError) else '',
108 class Client(object):
111 def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
112 self.base_url = base_url
115 self.DATE_FORMATS = [
116 '%a %b %d %H:%M:%S %Y',
117 '%A, %d-%b-%y %H:%M:%S GMT',
118 '%a, %d %b %Y %H:%M:%S GMT']
119 self.http_client = http_client
121 def _init_thread_limit(self, limit=1):
122 self._thread_limit = limit
123 self._elapsed_old = 0.0
124 self._elapsed_new = 0.0
126 def _watch_thread_limit(self, threadlist):
127 recvlog.debug('# running threads: %s' % len(threadlist))
128 if (self._elapsed_old > self._elapsed_new) and (
129 self._thread_limit < self.POOL_SIZE):
130 self._thread_limit += 1
131 elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
132 self._thread_limit -= 1
134 self._elapsed_old = self._elapsed_new
135 if len(threadlist) >= self._thread_limit:
136 self._elapsed_new = 0.0
137 for thread in threadlist:
140 self._elapsed_new += time() - begin_time
141 self._elapsed_new = self._elapsed_new / len(threadlist)
145 def _raise_for_status(self, r):
146 status_msg = getattr(r, 'status', '')
148 message = '%s %s\n' % (status_msg, r.text)
150 message = '%s %s\n' % (status_msg, r)
151 status = getattr(r, 'status_code', getattr(r, 'status', 0))
152 raise ClientError(message, status=status)
154 def set_header(self, name, value, iff=True):
155 """Set a header 'name':'value'"""
156 if value is not None and iff:
157 self.http_client.set_header(name, value)
159 def set_param(self, name, value=None, iff=True):
161 self.http_client.set_param(name, value)
163 def set_default_header(self, name, value):
164 self.http_client.headers.setdefault(name, value)
173 """In threaded/asynchronous requests, headers and params are not safe
174 Therefore, the standard self.set_header/param system can be used only
175 for headers and params that are common for all requests. All other
176 params and headers should passes as
179 E.g. in most queries the 'X-Auth-Token' header might be the same for
180 all, but the 'Range' header might be different from request to request.
183 success = kwargs.pop('success', 200)
185 data = kwargs.pop('data', None)
186 self.set_default_header('X-Auth-Token', self.token)
189 data = dumps(kwargs.pop('json'))
190 self.set_default_header('Content-Type', 'application/json')
192 self.set_default_header('Content-Length', '%s' % len(data))
194 sendlog.info('perform a %s @ %s', method, self.base_url)
196 self.http_client.url = self.base_url + (
197 '/' if (self.base_url and self.base_url[-1]) != '/' else '')
198 self.http_client.path = quote(path.encode('utf8'))
199 r = self.http_client.perform_request(
205 req = self.http_client
206 sendlog.info('%s %s', method, req.url)
207 headers = dict(req.headers)
208 headers.update(async_headers)
210 for key, val in headers.items():
211 sendlog.info('\t%s: %s', key, val)
214 datasendlog.info(data)
216 recvlog.info('%d %s', r.status_code, r.status)
217 for key, val in r.headers.items():
218 recvlog.info('%s: %s', key, val)
220 datarecvlog.info(r.content)
222 except (HTTPResponseError, HTTPConnectionError) as err:
223 from traceback import format_stack
224 recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
225 self.http_client.reset_headers()
226 self.http_client.reset_params()
229 errstr = ('%s' % type(err))[7:-2]
230 status = getattr(err, 'status', getattr(err, 'errno', 0))
231 raise ClientError('%s\n' % errstr, status=status)
233 self.http_client.reset_headers()
234 self.http_client.reset_params()
236 if success is not None:
237 # Success can either be an in or a collection
238 success = (success,) if isinstance(success, int) else success
239 if r.status_code not in success:
241 self._raise_for_status(r)
244 def delete(self, path, **kwargs):
245 return self.request('delete', path, **kwargs)
247 def get(self, path, **kwargs):
248 return self.request('get', path, **kwargs)
250 def head(self, path, **kwargs):
251 return self.request('head', path, **kwargs)
253 def post(self, path, **kwargs):
254 return self.request('post', path, **kwargs)
256 def put(self, path, **kwargs):
257 return self.request('put', path, **kwargs)
259 def copy(self, path, **kwargs):
260 return self.request('copy', path, **kwargs)
262 def move(self, path, **kwargs):
263 return self.request('move', path, **kwargs)