Move cluster handling in server create/delete
[kamaki] / kamaki / clients / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, self.list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, self.list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from urllib2 import quote, unquote
35 from urlparse import urlparse
36 from threading import Thread
37 from json import dumps, loads
38 from time import time
39 from httplib import ResponseNotReady, HTTPException
40 from time import sleep
41 from random import random
42 from logging import getLogger
43
44 from objpool.http import PooledHTTPConnection
45
46
47 TIMEOUT = 60.0   # seconds
48 HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49
50 log = getLogger(__name__)
51 sendlog = getLogger('%s.send' % __name__)
52 recvlog = getLogger('%s.recv' % __name__)
53
54
55 def _encode(v):
56     if v and isinstance(v, unicode):
57         return quote(v.encode('utf-8'))
58     return v
59
60
61 class ClientError(Exception):
62     def __init__(self, message, status=0, details=None):
63         log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64             message,
65             status,
66             details))
67         try:
68             message += '' if message and message[-1] == '\n' else '\n'
69             serv_stat, sep, new_msg = message.partition('{')
70             new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71             json_msg = loads(new_msg)
72             key = json_msg.keys()[0]
73             serv_stat = serv_stat.strip()
74
75             json_msg = json_msg[key]
76             message = '%s %s (%s)\n' % (
77                 serv_stat,
78                 key,
79                 json_msg['message']) if (
80                     'message' in json_msg) else '%s %s' % (serv_stat, key)
81             status = json_msg.get('code', status)
82             if 'details' in json_msg:
83                 if not details:
84                     details = []
85                 if not isinstance(details, list):
86                     details = [details]
87                 if json_msg['details']:
88                     details.append(json_msg['details'])
89         except Exception:
90             pass
91         finally:
92             while message.endswith('\n\n'):
93                 message = message[:-1]
94             super(ClientError, self).__init__(message)
95             self.status = status if isinstance(status, int) else 0
96             self.details = details if details else []
97
98
99 class Logged(object):
100
101     LOG_TOKEN = False
102     LOG_DATA = False
103     LOG_PID = False
104     _token = None
105
106
107 class RequestManager(Logged):
108     """Handle http request information"""
109
110     def _connection_info(self, url, path, params={}):
111         """ Set self.url to scheme://netloc/?params
112         :param url: (str or unicode) The service url
113
114         :param path: (str or unicode) The service path (url/path)
115
116         :param params: (dict) Parameters to add to final url
117
118         :returns: (scheme, netloc)
119         """
120         url = _encode(str(url)) if url else 'http://127.0.0.1/'
121         url += '' if url.endswith('/') else '/'
122         if path:
123             url += _encode(path[1:] if path.startswith('/') else path)
124         delim = '?'
125         for key, val in params.items():
126             val = '' if val in (None, False) else _encode(u'%s' % val)
127             url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128             delim = '&'
129         parsed = urlparse(url)
130         self.url = url
131         self.path = parsed.path or '/'
132         if parsed.query:
133             self.path += '?%s' % parsed.query
134         return (parsed.scheme, parsed.netloc)
135
136     def __init__(
137             self, method, url, path,
138             data=None, headers={}, params={}):
139         method = method.upper()
140         assert method in HTTP_METHODS, 'Invalid http method %s' % method
141         if headers:
142             assert isinstance(headers, dict)
143         self.headers = dict(headers)
144         self.method, self.data = method, data
145         self.scheme, self.netloc = self._connection_info(url, path, params)
146
147     def dump_log(self):
148         plog = '\t[%s]' if self.LOG_PID else ''
149         sendlog.info('- -  -   -     -        -             -')
150         sendlog.info('%s %s://%s%s%s' % (
151             self.method, self.scheme, self.netloc, self.path, plog))
152         for key, val in self.headers.items():
153             if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154                 self._token, val = val, '...'
155             sendlog.info('  %s: %s%s' % (key, val, plog))
156         if self.data:
157             sendlog.info('data size:%s%s' % (len(self.data), plog))
158             if self.LOG_DATA:
159                 sendlog.info(self.data.replace(self._token, '...') if (
160                     self._token) else self.data)
161         else:
162             sendlog.info('data size:0%s' % plog)
163
164     def perform(self, conn):
165         """
166         :param conn: (httplib connection object)
167
168         :returns: (HTTPResponse)
169         """
170         self.dump_log()
171         conn.request(
172             method=str(self.method.upper()),
173             url=str(self.path),
174             headers=self.headers,
175             body=self.data)
176         sendlog.info('')
177         keep_trying = TIMEOUT
178         while keep_trying > 0:
179             try:
180                 return conn.getresponse()
181             except ResponseNotReady:
182                 wait = 0.03 * random()
183                 sleep(wait)
184                 keep_trying -= wait
185         plog = '\t[%s]' if self.LOG_PID else ''
186         logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187         recvlog.debug(logmsg)
188         raise ClientError('HTTPResponse takes too long - kamaki timeout')
189
190
191 class ResponseManager(Logged):
192     """Manage the http request and handle the response data, headers, etc."""
193
194     def __init__(self, request, poolsize=None, connection_retry_limit=0):
195         """
196         :param request: (RequestManager)
197
198         :param poolsize: (int) the size of the connection pool
199
200         :param connection_retry_limit: (int)
201         """
202         self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203         self.request = request
204         self._request_performed = False
205         self.poolsize = poolsize
206
207     def _get_response(self):
208         if self._request_performed:
209             return
210
211         pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212         for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
213             try:
214                 with PooledHTTPConnection(
215                         self.request.netloc, self.request.scheme,
216                         **pool_kw) as connection:
217                     self.request.LOG_TOKEN = self.LOG_TOKEN
218                     self.request.LOG_DATA = self.LOG_DATA
219                     self.request.LOG_PID = self.LOG_PID
220                     r = self.request.perform(connection)
221                     plog = ''
222                     if self.LOG_PID:
223                         recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224                             self, r, self.request))
225                         plog = '\t[%s]' % self
226                     self._request_performed = True
227                     self._status_code, self._status = r.status, unquote(
228                         r.reason)
229                     recvlog.info(
230                         '%d %s%s' % (
231                             self.status_code, self.status, plog))
232                     self._headers = dict()
233                     for k, v in r.getheaders():
234                         if k.lower in ('x-auth-token', ) and (
235                                 not self.LOG_TOKEN):
236                             self._token, v = v, '...'
237                         v = unquote(v)
238                         self._headers[k] = v
239                         recvlog.info('  %s: %s%s' % (k, v, plog))
240                     self._content = r.read()
241                     recvlog.info('data size: %s%s' % (
242                         len(self._content) if self._content else 0, plog))
243                     if self.LOG_DATA and self._content:
244                         data = '%s%s' % (self._content, plog)
245                         if self._token:
246                             data = data.replace(self._token, '...')
247                         sendlog.info(data)
248                     sendlog.info('-             -        -     -   -  - -')
249                 break
250             except Exception as err:
251                 if isinstance(err, HTTPException):
252                     if retries >= self.CONNECTION_TRY_LIMIT:
253                         raise ClientError(
254                             'Connection to %s failed %s times (%s: %s )' % (
255                                 self.request.url, retries, type(err), err))
256                 else:
257                     from traceback import format_stack
258                     recvlog.debug(
259                         '\n'.join(['%s' % type(err)] + format_stack()))
260                     raise ClientError(
261                         'Failed while http-connecting to %s (%s)' % (
262                             self.request.url, err))
263
264     @property
265     def status_code(self):
266         self._get_response()
267         return self._status_code
268
269     @property
270     def status(self):
271         self._get_response()
272         return self._status
273
274     @property
275     def headers(self):
276         self._get_response()
277         return self._headers
278
279     @property
280     def content(self):
281         self._get_response()
282         return self._content
283
284     @property
285     def text(self):
286         """
287         :returns: (str) content
288         """
289         self._get_response()
290         return '%s' % self._content
291
292     @property
293     def json(self):
294         """
295         :returns: (dict) squeezed from json-formated content
296         """
297         self._get_response()
298         try:
299             return loads(self._content)
300         except ValueError as err:
301             raise ClientError('Response not formated in JSON - %s' % err)
302
303
304 class SilentEvent(Thread):
305     """Thread-run method(*args, **kwargs)"""
306     def __init__(self, method, *args, **kwargs):
307         super(self.__class__, self).__init__()
308         self.method = method
309         self.args = args
310         self.kwargs = kwargs
311
312     @property
313     def exception(self):
314         return getattr(self, '_exception', False)
315
316     @property
317     def value(self):
318         return getattr(self, '_value', None)
319
320     def run(self):
321         try:
322             self._value = self.method(*(self.args), **(self.kwargs))
323         except Exception as e:
324             recvlog.debug('Thread %s got exception %s\n<%s %s' % (
325                 self,
326                 type(e),
327                 e.status if isinstance(e, ClientError) else '',
328                 e))
329             self._exception = e
330
331
332 class Client(Logged):
333
334     MAX_THREADS = 7
335     DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336     CONNECTION_RETRY_LIMIT = 0
337
338     def __init__(self, base_url, token):
339         assert base_url, 'No base_url for client %s' % self
340         self.base_url = base_url
341         self.token = token
342         self.headers, self.params = dict(), dict()
343
344     def _init_thread_limit(self, limit=1):
345         assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
346         self._thread_limit = limit
347         self._elapsed_old = 0.0
348         self._elapsed_new = 0.0
349
350     def _watch_thread_limit(self, threadlist):
351         self._thread_limit = getattr(self, '_thread_limit', 1)
352         self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
353         self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
354         recvlog.debug('# running threads: %s' % len(threadlist))
355         if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
356                 self._thread_limit < self.MAX_THREADS):
357             self._thread_limit += 1
358         elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
359             self._thread_limit -= 1
360
361         self._elapsed_old = self._elapsed_new
362         if len(threadlist) >= self._thread_limit:
363             self._elapsed_new = 0.0
364             for thread in threadlist:
365                 begin_time = time()
366                 thread.join()
367                 self._elapsed_new += time() - begin_time
368             self._elapsed_new = self._elapsed_new / len(threadlist)
369             return []
370         return threadlist
371
372     def async_run(self, method, kwarg_list):
373         """Fire threads of operations
374
375         :param method: the method to run in each thread
376
377         :param kwarg_list: (list of dicts) the arguments to pass in each method
378             call
379
380         :returns: (list) the results of each method call w.r. to the order of
381             kwarg_list
382         """
383         flying, results = {}, {}
384         self._init_thread_limit()
385         for index, kwargs in enumerate(kwarg_list):
386             self._watch_thread_limit(flying.values())
387             flying[index] = SilentEvent(method=method, **kwargs)
388             flying[index].start()
389             unfinished = {}
390             for key, thread in flying.items():
391                 if thread.isAlive():
392                     unfinished[key] = thread
393                 elif thread.exception:
394                     print 'HERE IS AN EXCEPTION MK?'
395                     raise thread.exception
396                 else:
397                     results[key] = thread.value
398                 print 'NO EXCEPTION', thread.value
399             flying = unfinished
400         sendlog.info('- - - wait for threads to finish')
401         for key, thread in flying.items():
402             if thread.isAlive():
403                 thread.join()
404             elif thread.exception:
405                 print 'HERE IS AN EXCEPTION MK-2?'
406                 raise thread.exception
407             results[key] = thread.value
408             print 'NO EXCEPTION-2', thread.value
409         return results.values()
410
411     def _raise_for_status(self, r):
412         log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
413         status_msg = getattr(r, 'status', None) or ''
414         try:
415             message = '%s %s\n' % (status_msg, r.text)
416         except:
417             message = '%s %s\n' % (status_msg, r)
418         status = getattr(r, 'status_code', getattr(r, 'status', 0))
419         raise ClientError(message, status=status)
420
421     def set_header(self, name, value, iff=True):
422         """Set a header 'name':'value'"""
423         if value is not None and iff:
424             self.headers[name] = unicode(value)
425
426     def set_param(self, name, value=None, iff=True):
427         if iff:
428             self.params[name] = unicode(value)
429
430     def request(
431             self, method, path,
432             async_headers=dict(), async_params=dict(),
433             **kwargs):
434         """Commit an HTTP request to base_url/path
435         Requests are commited to and performed by Request/ResponseManager
436         These classes perform a lazy http request. Present method, by default,
437         enforces them to perform the http call. Hint: call present method with
438         success=None to get a non-performed ResponseManager object.
439         """
440         assert isinstance(method, str) or isinstance(method, unicode)
441         assert method
442         assert isinstance(path, str) or isinstance(path, unicode)
443         try:
444             headers = dict(self.headers)
445             headers.update(async_headers)
446             params = dict(self.params)
447             params.update(async_params)
448             success = kwargs.pop('success', 200)
449             data = kwargs.pop('data', None)
450             headers.setdefault('X-Auth-Token', self.token)
451             if 'json' in kwargs:
452                 data = dumps(kwargs.pop('json'))
453                 headers.setdefault('Content-Type', 'application/json')
454             if data:
455                 headers.setdefault('Content-Length', '%s' % len(data))
456
457             plog = '\t[%s]' if self.LOG_PID else ''
458             sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
459             req = RequestManager(
460                 method, self.base_url, path,
461                 data=data, headers=headers, params=params)
462             #  req.log()
463             r = ResponseManager(
464                 req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
465             r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
466                 self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
467             r._token = headers['X-Auth-Token']
468         finally:
469             self.headers = dict()
470             self.params = dict()
471
472         if success is not None:
473             # Success can either be an int or a collection
474             success = (success,) if isinstance(success, int) else success
475             if r.status_code not in success:
476                 self._raise_for_status(r)
477         return r
478
479     def delete(self, path, **kwargs):
480         return self.request('delete', path, **kwargs)
481
482     def get(self, path, **kwargs):
483         return self.request('get', path, **kwargs)
484
485     def head(self, path, **kwargs):
486         return self.request('head', path, **kwargs)
487
488     def post(self, path, **kwargs):
489         return self.request('post', path, **kwargs)
490
491     def put(self, path, **kwargs):
492         return self.request('put', path, **kwargs)
493
494     def copy(self, path, **kwargs):
495         return self.request('copy', path, **kwargs)
496
497     def move(self, path, **kwargs):
498         return self.request('move', path, **kwargs)