1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, self.list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, self.list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from urllib2 import quote
35 from threading import Thread
36 from json import dumps, loads
39 from kamaki.clients.utils import get_logger, add_file_logger, get_log_filename
40 from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
41 from kamaki.clients.connection.errors import KamakiConnectionError
42 from kamaki.clients.connection.errors import KamakiResponseError
45 DEBUG_LOG = get_log_filename()
47 add_file_logger('clients.send', __name__, filename=DEBUG_LOG)
48 sendlog = get_logger('clients.send')
49 sendlog.debug('Logging location: %s' % DEBUG_LOG)
50 add_file_logger('data.send', __name__, filename=DEBUG_LOG)
51 datasendlog = get_logger('data.send')
52 add_file_logger('clients.recv', __name__, filename=DEBUG_LOG)
53 recvlog = get_logger('clients.recv')
54 add_file_logger('data.recv', __name__, filename=DEBUG_LOG)
55 datarecvlog = get_logger('data.recv')
58 class ClientError(Exception):
59 def __init__(self, message, status=0, details=None):
61 message += '' if message and message[-1] == '\n' else '\n'
62 serv_stat, sep, new_msg = message.partition('{')
63 new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
64 json_msg = loads(new_msg)
65 key = json_msg.keys()[0]
66 serv_stat = serv_stat.strip()
68 json_msg = json_msg[key]
69 message = '%s %s (%s)\n' % (
72 json_msg['message']) if (
73 'message' in json_msg) else '%s %s' % (serv_stat, key)
74 status = json_msg.get('code', status)
75 if 'details' in json_msg:
78 if not isinstance(details, list):
80 if json_msg['details']:
81 details.append(json_msg['details'])
85 while message.endswith('\n\n'):
86 message = message[:-1]
87 super(ClientError, self).__init__(message)
88 self.status = status if isinstance(status, int) else 0
89 self.details = details if details else []
92 class SilentEvent(Thread):
93 """ Thread-run method(*args, **kwargs)"""
94 def __init__(self, method, *args, **kwargs):
95 super(self.__class__, self).__init__()
102 return getattr(self, '_exception', False)
106 return getattr(self, '_value', None)
110 self._value = self.method(*(self.args), **(self.kwargs))
111 except Exception as e:
112 recvlog.debug('Thread %s got exception %s\n<%s %s' % (
115 e.status if isinstance(e, ClientError) else '',
120 class Client(object):
122 def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
123 self.base_url = base_url
126 self.DATE_FORMATS = [
127 '%a %b %d %H:%M:%S %Y',
128 '%A, %d-%b-%y %H:%M:%S GMT',
129 '%a, %d %b %Y %H:%M:%S GMT']
130 self.http_client = http_client
133 def _init_thread_limit(self, limit=1):
134 assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
135 self._thread_limit = limit
136 self._elapsed_old = 0.0
137 self._elapsed_new = 0.0
139 def _watch_thread_limit(self, threadlist):
140 self._thread_limit = getattr(self, '_thread_limit', 1)
141 self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
142 self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
143 recvlog.debug('# running threads: %s' % len(threadlist))
144 if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
145 self._thread_limit < self.MAX_THREADS):
146 self._thread_limit += 1
147 elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
148 self._thread_limit -= 1
150 self._elapsed_old = self._elapsed_new
151 if len(threadlist) >= self._thread_limit:
152 self._elapsed_new = 0.0
153 for thread in threadlist:
156 self._elapsed_new += time() - begin_time
157 self._elapsed_new = self._elapsed_new / len(threadlist)
161 def _raise_for_status(self, r):
162 status_msg = getattr(r, 'status', None) or ''
164 message = '%s %s\n' % (status_msg, r.text)
166 message = '%s %s\n' % (status_msg, r)
167 status = getattr(r, 'status_code', getattr(r, 'status', 0))
168 raise ClientError(message, status=status)
170 def set_header(self, name, value, iff=True):
171 """Set a header 'name':'value'"""
172 if value is not None and iff:
173 self.http_client.set_header(name, value)
175 def set_param(self, name, value=None, iff=True):
177 self.http_client.set_param(name, value)
186 """In threaded/asynchronous requests, headers and params are not safe
187 Therefore, the standard self.set_header/param system can be used only
188 for headers and params that are common for all requests. All other
189 params and headers should passes as
192 E.g. in most queries the 'X-Auth-Token' header might be the same for
193 all, but the 'Range' header might be different from request to request.
195 assert isinstance(method, str) or isinstance(method, unicode)
197 assert isinstance(path, str) or isinstance(path, unicode)
199 success = kwargs.pop('success', 200)
200 data = kwargs.pop('data', None)
201 self.http_client.headers.setdefault('X-Auth-Token', self.token)
204 data = dumps(kwargs.pop('json'))
205 self.http_client.headers.setdefault(
209 self.http_client.headers.setdefault(
213 sendlog.info('perform a %s @ %s', method, self.base_url)
215 self.http_client.url = self.base_url
216 self.http_client.path = quote(path.encode('utf8'))
217 r = self.http_client.perform_request(
223 req = self.http_client
224 sendlog.info('%s %s', method, req.url)
225 headers = dict(req.headers)
226 headers.update(async_headers)
228 for key, val in headers.items():
229 if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
231 sendlog.info('\t%s: %s', key, val)
234 datasendlog.info(data)
236 recvlog.info('%d %s', r.status_code, r.status)
237 for key, val in r.headers.items():
238 if (not LOG_TOKEN) and key.lower() == 'x-auth-token':
240 recvlog.info('%s: %s', key, val)
242 datarecvlog.info(r.content)
244 except (KamakiResponseError, KamakiConnectionError) as err:
245 from traceback import format_stack
246 recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
247 self.http_client.reset_headers()
248 self.http_client.reset_params()
251 errstr = ('%s' % type(err))[7:-2]
252 status = getattr(err, 'status', getattr(err, 'errno', 0))
253 raise ClientError('%s\n' % errstr, status=status)
255 self.http_client.reset_headers()
256 self.http_client.reset_params()
258 if success is not None:
259 # Success can either be an int or a collection
260 success = (success,) if isinstance(success, int) else success
261 if r.status_code not in success:
263 self._raise_for_status(r)
266 def delete(self, path, **kwargs):
267 return self.request('delete', path, **kwargs)
269 def get(self, path, **kwargs):
270 return self.request('get', path, **kwargs)
272 def head(self, path, **kwargs):
273 return self.request('head', path, **kwargs)
275 def post(self, path, **kwargs):
276 return self.request('post', path, **kwargs)
278 def put(self, path, **kwargs):
279 return self.request('put', path, **kwargs)
281 def copy(self, path, **kwargs):
282 return self.request('copy', path, **kwargs)
284 def move(self, path, **kwargs):
285 return self.request('move', path, **kwargs)