1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import Thread
35 from json import dumps, loads
38 from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
39 from kamaki.clients.connection.errors import HTTPConnectionError
40 from kamaki.clients.connection.errors import HTTPResponseError
42 sendlog = logging.getLogger('clients.send')
43 datasendlog = logging.getLogger('data.send')
44 recvlog = logging.getLogger('clients.recv')
45 datarecvlog = logging.getLogger('data.recv')
48 class ClientError(Exception):
49 def __init__(self, message, status=0, details=None):
51 message += '' if message and message[-1] == '\n' else '\n'
52 serv_stat, sep, new_msg = message.partition('{')
53 new_msg = sep + new_msg
54 json_msg = loads(new_msg)
55 key = json_msg.keys()[0]
57 json_msg = json_msg[key]
58 message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
59 if 'message' in json_msg else '%s %s' % (serv_stat, key)
60 if 'code' in json_msg:
61 status = json_msg['code']
62 if 'details' in json_msg:
65 elif not isinstance(details, list):
67 if json_msg['details']:
68 details.append(json_msg['details'])
72 super(ClientError, self).__init__(message)
74 self.details = details if details else []
77 class SilentEvent(Thread):
78 """ Thread-run method(*args, **kwargs)
79 put exception in exception_bucket
81 def __init__(self, method, *args, **kwargs):
82 super(self.__class__, self).__init__()
89 return getattr(self, '_exception', False)
93 return getattr(self, '_value', None)
97 self._value = self.method(*(self.args), **(self.kwargs))
98 except Exception as e:
99 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
102 e.status if isinstance(e, ClientError) else '',
107 class Client(object):
110 def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
111 self.base_url = base_url
114 self.DATE_FORMATS = [
115 '%a %b %d %H:%M:%S %Y',
116 '%A, %d-%b-%y %H:%M:%S GMT',
117 '%a, %d %b %Y %H:%M:%S GMT']
118 self.http_client = http_client
120 def _init_thread_limit(self, limit=1):
121 self._thread_limit = limit
122 self._elapsed_old = 0.0
123 self._elapsed_new = 0.0
125 def _watch_thread_limit(self, threadlist):
126 recvlog.debug('# running threads: %s' % len(threadlist))
127 if (self._elapsed_old > self._elapsed_new) and (
128 self._thread_limit < self.POOL_SIZE):
129 self._thread_limit += 1
130 elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
131 self._thread_limit -= 1
133 self._elapsed_old = self._elapsed_new
134 if len(threadlist) >= self._thread_limit:
135 self._elapsed_new = 0.0
136 for thread in threadlist:
139 self._elapsed_new += time() - begin_time
140 self._elapsed_new = self._elapsed_new / len(threadlist)
144 def _raise_for_status(self, r):
145 status_msg = getattr(r, 'status', '')
147 message = '%s %s\n' % (status_msg, r.text)
149 message = '%s %s\n' % (status_msg, r)
150 status = getattr(r, 'status_code', getattr(r, 'status', 0))
151 raise ClientError(message, status=status)
153 def set_header(self, name, value, iff=True):
154 """Set a header 'name':'value'"""
155 if value is not None and iff:
156 self.http_client.set_header(name, value)
158 def set_param(self, name, value=None, iff=True):
160 self.http_client.set_param(name, value)
162 def set_default_header(self, name, value):
163 self.http_client.headers.setdefault(name, value)
172 """In threaded/asynchronous requests, headers and params are not safe
173 Therefore, the standard self.set_header/param system can be used only
174 for headers and params that are common for all requests. All other
175 params and headers should passes as
178 E.g. in most queries the 'X-Auth-Token' header might be the same for
179 all, but the 'Range' header might be different from request to request.
182 success = kwargs.pop('success', 200)
184 data = kwargs.pop('data', None)
185 self.set_default_header('X-Auth-Token', self.token)
188 data = dumps(kwargs.pop('json'))
189 self.set_default_header('Content-Type', 'application/json')
191 self.set_default_header('Content-Length', unicode(len(data)))
193 sendlog.info('perform a %s @ %s', method, self.base_url)
195 self.http_client.url = self.base_url
196 self.http_client.path = path
197 r = self.http_client.perform_request(
203 req = self.http_client
204 sendlog.info('%s %s', method, req.url)
205 headers = dict(req.headers)
206 headers.update(async_headers)
208 for key, val in headers.items():
209 sendlog.info('\t%s: %s', key, val)
212 datasendlog.info(data)
214 recvlog.info('%d %s', r.status_code, r.status)
215 for key, val in r.headers.items():
216 recvlog.info('%s: %s', key, val)
218 datarecvlog.info(r.content)
220 except (HTTPResponseError, HTTPConnectionError) as err:
221 from traceback import format_stack
222 recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
223 self.http_client.reset_headers()
224 self.http_client.reset_params()
227 errstr = ('%s' % type(err))[7:-2]
228 status = getattr(err, 'status', getattr(err, 'errno', 0))
229 raise ClientError('%s\n' % errstr, status=status)
231 self.http_client.reset_headers()
232 self.http_client.reset_params()
234 if success is not None:
235 # Success can either be an in or a collection
236 success = (success,) if isinstance(success, int) else success
237 if r.status_code not in success:
239 self._raise_for_status(r)
242 def delete(self, path, **kwargs):
243 return self.request('delete', path, **kwargs)
245 def get(self, path, **kwargs):
246 return self.request('get', path, **kwargs)
248 def head(self, path, **kwargs):
249 return self.request('head', path, **kwargs)
251 def post(self, path, **kwargs):
252 return self.request('post', path, **kwargs)
254 def put(self, path, **kwargs):
255 return self.request('put', path, **kwargs)
257 def copy(self, path, **kwargs):
258 return self.request('copy', path, **kwargs)
260 def move(self, path, **kwargs):
261 return self.request('move', path, **kwargs)