1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote
35 from threading import Thread
36 from json import dumps, loads
39 from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40 from kamaki.clients.connection.errors import HTTPConnectionError
41 from kamaki.clients.connection.errors import HTTPResponseError
43 sendlog = logging.getLogger('clients.send')
44 datasendlog = logging.getLogger('data.send')
45 recvlog = logging.getLogger('clients.recv')
46 datarecvlog = logging.getLogger('data.recv')
49 class ClientError(Exception):
50 def __init__(self, message, status=0, details=None):
52 message += '' if message and message[-1] == '\n' else '\n'
53 serv_stat, sep, new_msg = message.partition('{')
54 new_msg = sep + new_msg
55 json_msg = loads(new_msg)
56 key = json_msg.keys()[0]
58 json_msg = json_msg[key]
59 message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
60 if 'message' in json_msg else '%s %s' % (serv_stat, key)
61 if 'code' in json_msg:
62 status = json_msg['code']
63 if 'details' in json_msg:
66 elif not isinstance(details, list):
68 if json_msg['details']:
69 details.append(json_msg['details'])
73 super(ClientError, self).__init__(message)
75 self.details = details if details else []
78 class SilentEvent(Thread):
79 """ Thread-run method(*args, **kwargs)
80 put exception in exception_bucket
82 def __init__(self, method, *args, **kwargs):
83 super(self.__class__, self).__init__()
90 return getattr(self, '_exception', False)
94 return getattr(self, '_value', None)
98 self._value = self.method(*(self.args), **(self.kwargs))
99 except Exception as e:
100 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
103 e.status if isinstance(e, ClientError) else '',
108 class Client(object):
111 def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
112 self.base_url = base_url
115 self.DATE_FORMATS = [
116 '%a %b %d %H:%M:%S %Y',
117 '%A, %d-%b-%y %H:%M:%S GMT',
118 '%a, %d %b %Y %H:%M:%S GMT']
119 self.http_client = http_client
121 def _init_thread_limit(self, limit=1):
122 self._thread_limit = limit
123 self._elapsed_old = 0.0
124 self._elapsed_new = 0.0
126 def _watch_thread_limit(self, threadlist):
127 recvlog.debug('# running threads: %s' % len(threadlist))
128 if (self._elapsed_old > self._elapsed_new) and (
129 self._thread_limit < self.POOL_SIZE):
130 self._thread_limit += 1
131 elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
132 self._thread_limit -= 1
134 self._elapsed_old = self._elapsed_new
135 if len(threadlist) >= self._thread_limit:
136 self._elapsed_new = 0.0
137 for thread in threadlist:
140 self._elapsed_new += time() - begin_time
141 self._elapsed_new = self._elapsed_new / len(threadlist)
145 def _raise_for_status(self, r):
146 status_msg = getattr(r, 'status', '')
148 message = '%s %s\n' % (status_msg, r.text)
150 message = '%s %s\n' % (status_msg, r)
151 status = getattr(r, 'status_code', getattr(r, 'status', 0))
152 raise ClientError(message, status=status)
154 def set_header(self, name, value, iff=True):
155 """Set a header 'name':'value'"""
156 if value is not None and iff:
157 self.http_client.set_header(name, value)
159 def set_param(self, name, value=None, iff=True):
161 self.http_client.set_param(name, value)
163 def set_default_header(self, name, value):
164 self.http_client.headers.setdefault(name, value)
173 """In threaded/asynchronous requests, headers and params are not safe
174 Therefore, the standard self.set_header/param system can be used only
175 for headers and params that are common for all requests. All other
176 params and headers should passes as
179 E.g. in most queries the 'X-Auth-Token' header might be the same for
180 all, but the 'Range' header might be different from request to request.
183 success = kwargs.pop('success', 200)
185 data = kwargs.pop('data', None)
186 self.set_default_header('X-Auth-Token', self.token)
189 data = dumps(kwargs.pop('json'))
190 self.set_default_header('Content-Type', 'application/json')
192 self.set_default_header('Content-Length', '%s' % len(data))
194 sendlog.info('perform a %s @ %s', method, self.base_url)
196 self.http_client.url = self.base_url
197 self.http_client.path = quote(path)
198 r = self.http_client.perform_request(
204 req = self.http_client
205 sendlog.info('%s %s', method, req.url)
206 headers = dict(req.headers)
207 headers.update(async_headers)
209 for key, val in headers.items():
210 sendlog.info('\t%s: %s', key, val)
213 datasendlog.info(data)
215 recvlog.info('%d %s', r.status_code, r.status)
216 for key, val in r.headers.items():
217 recvlog.info('%s: %s', key, val)
219 datarecvlog.info(r.content)
221 except (HTTPResponseError, HTTPConnectionError) as err:
222 from traceback import format_stack
223 recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
224 self.http_client.reset_headers()
225 self.http_client.reset_params()
228 errstr = ('%s' % type(err))[7:-2]
229 status = getattr(err, 'status', getattr(err, 'errno', 0))
230 raise ClientError('%s\n' % errstr, status=status)
232 self.http_client.reset_headers()
233 self.http_client.reset_params()
235 if success is not None:
236 # Success can either be an in or a collection
237 success = (success,) if isinstance(success, int) else success
238 if r.status_code not in success:
240 self._raise_for_status(r)
243 def delete(self, path, **kwargs):
244 return self.request('delete', path, **kwargs)
246 def get(self, path, **kwargs):
247 return self.request('get', path, **kwargs)
249 def head(self, path, **kwargs):
250 return self.request('head', path, **kwargs)
252 def post(self, path, **kwargs):
253 return self.request('post', path, **kwargs)
255 def put(self, path, **kwargs):
256 return self.request('put', path, **kwargs)
258 def copy(self, path, **kwargs):
259 return self.request('copy', path, **kwargs)
261 def move(self, path, **kwargs):
262 return self.request('move', path, **kwargs)