Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ b773795c

History | View | Annotate | Download (15.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103

    
104

    
105
class RequestManager(Logged):
106
    """Handle http request information"""
107

    
108
    def _connection_info(self, url, path, params={}):
109
        """ Set self.url to scheme://netloc/?params
110
        :param url: (str or unicode) The service url
111

112
        :param path: (str or unicode) The service path (url/path)
113

114
        :param params: (dict) Parameters to add to final url
115

116
        :returns: (scheme, netloc)
117
        """
118
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
119
        url += '' if url.endswith('/') else '/'
120
        if path:
121
            url += _encode(path[1:] if path.startswith('/') else path)
122
        delim = '?'
123
        for key, val in params.items():
124
            val = _encode(val)
125
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
126
            delim = '&'
127
        parsed = urlparse(url)
128
        self.url = url
129
        self.path = parsed.path or '/'
130
        if parsed.query:
131
            self.path += '?%s' % parsed.query
132
        return (parsed.scheme, parsed.netloc)
133

    
134
    def __init__(
135
            self, method, url, path,
136
            data=None, headers={}, params={}):
137
        method = method.upper()
138
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
139
        if headers:
140
            assert isinstance(headers, dict)
141
        self.headers = dict(headers)
142
        self.method, self.data = method, data
143
        self.scheme, self.netloc = self._connection_info(url, path, params)
144

    
145
    def dump_log(self):
146
        sendlog.info('%s %s://%s%s\t[%s]' % (
147
            self.method, self.scheme, self.netloc, self.path, self))
148
        for key, val in self.headers.items():
149
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
150
                continue
151
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
152
        if self.data:
153
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
154
            if self.LOG_DATA:
155
                sendlog.info(self.data)
156
        else:
157
            sendlog.info('data size:0\t[%s]' % self)
158
        sendlog.info('')
159

    
160
    def perform(self, conn):
161
        """
162
        :param conn: (httplib connection object)
163

164
        :returns: (HTTPResponse)
165
        """
166
        conn.request(
167
            method=str(self.method.upper()),
168
            url=str(self.path),
169
            headers=self.headers,
170
            body=self.data)
171
        self.dump_log()
172
        keep_trying = TIMEOUT
173
        while keep_trying > 0:
174
            try:
175
                return conn.getresponse()
176
            except ResponseNotReady:
177
                wait = 0.03 * random()
178
                sleep(wait)
179
                keep_trying -= wait
180
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
181
        recvlog.debug(logmsg)
182
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
183

    
184

    
185
class ResponseManager(Logged):
186
    """Manage the http request and handle the response data, headers, etc."""
187

    
188
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
189
        """
190
        :param request: (RequestManager)
191

192
        :param poolsize: (int) the size of the connection pool
193

194
        :param connection_retry_limit: (int)
195
        """
196
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
197
        self.request = request
198
        self._request_performed = False
199
        self.poolsize = poolsize
200

    
201
    def _get_response(self):
202
        if self._request_performed:
203
            return
204

    
205
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
206
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
207
            try:
208
                with PooledHTTPConnection(
209
                        self.request.netloc, self.request.scheme,
210
                        **pool_kw) as connection:
211
                    self.request.LOG_TOKEN = self.LOG_TOKEN
212
                    self.request.LOG_DATA = self.LOG_DATA
213
                    r = self.request.perform(connection)
214
                    recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
215
                        self, r, self.request))
216
                    self._request_performed = True
217
                    self._status_code, self._status = r.status, unquote(
218
                        r.reason)
219
                    recvlog.info(
220
                        '%d %s\t[p: %s]' % (
221
                            self.status_code, self.status, self))
222
                    self._headers = dict()
223
                    for k, v in r.getheaders():
224
                        if (not self.LOG_TOKEN) and (
225
                                k.lower() == 'x-auth-token'):
226
                            continue
227
                        v = unquote(v)
228
                        self._headers[k] = v
229
                        recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
230
                    self._content = r.read()
231
                    recvlog.info('data size: %s\t[p: %s]' % (
232
                        len(self._content) if self._content else 0,
233
                        self))
234
                    if self.LOG_DATA and self._content:
235
                        recvlog.info('%s\t[p: %s]' % (self._content, self))
236
                break
237
            except Exception as err:
238
                if isinstance(err, HTTPException):
239
                    if retries >= self.CONNECTION_TRY_LIMIT:
240
                        raise ClientError(
241
                            'Connection to %s failed %s times (%s: %s )' % (
242
                                self.request.url, retries, type(err), err))
243
                else:
244
                    from traceback import format_stack
245
                    recvlog.debug(
246
                        '\n'.join(['%s' % type(err)] + format_stack()))
247
                    raise ClientError(
248
                        'Failed while http-connecting to %s (%s)' % (
249
                            self.request.url,
250
                            err))
251

    
252
    @property
253
    def status_code(self):
254
        self._get_response()
255
        return self._status_code
256

    
257
    @property
258
    def status(self):
259
        self._get_response()
260
        return self._status
261

    
262
    @property
263
    def headers(self):
264
        self._get_response()
265
        return self._headers
266

    
267
    @property
268
    def content(self):
269
        self._get_response()
270
        return self._content
271

    
272
    @property
273
    def text(self):
274
        """
275
        :returns: (str) content
276
        """
277
        self._get_response()
278
        return '%s' % self._content
279

    
280
    @property
281
    def json(self):
282
        """
283
        :returns: (dict) squeezed from json-formated content
284
        """
285
        self._get_response()
286
        try:
287
            return loads(self._content)
288
        except ValueError as err:
289
            raise ClientError('Response not formated in JSON - %s' % err)
290

    
291

    
292
class SilentEvent(Thread):
293
    """Thread-run method(*args, **kwargs)"""
294
    def __init__(self, method, *args, **kwargs):
295
        super(self.__class__, self).__init__()
296
        self.method = method
297
        self.args = args
298
        self.kwargs = kwargs
299

    
300
    @property
301
    def exception(self):
302
        return getattr(self, '_exception', False)
303

    
304
    @property
305
    def value(self):
306
        return getattr(self, '_value', None)
307

    
308
    def run(self):
309
        try:
310
            self._value = self.method(*(self.args), **(self.kwargs))
311
        except Exception as e:
312
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
313
                self,
314
                type(e),
315
                e.status if isinstance(e, ClientError) else '',
316
                e))
317
            self._exception = e
318

    
319

    
320
class Client(object):
321

    
322
    MAX_THREADS = 7
323
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
324
    LOG_TOKEN = False
325
    LOG_DATA = True
326
    CONNECTION_RETRY_LIMIT = 0
327

    
328
    def __init__(self, base_url, token):
329
        assert base_url, 'No base_url for client %s' % self
330
        self.base_url = base_url
331
        self.token = token
332
        self.headers, self.params = dict(), dict()
333

    
334
    def _init_thread_limit(self, limit=1):
335
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
336
        self._thread_limit = limit
337
        self._elapsed_old = 0.0
338
        self._elapsed_new = 0.0
339

    
340
    def _watch_thread_limit(self, threadlist):
341
        self._thread_limit = getattr(self, '_thread_limit', 1)
342
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
343
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
344
        recvlog.debug('# running threads: %s' % len(threadlist))
345
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
346
                self._thread_limit < self.MAX_THREADS):
347
            self._thread_limit += 1
348
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
349
            self._thread_limit -= 1
350

    
351
        self._elapsed_old = self._elapsed_new
352
        if len(threadlist) >= self._thread_limit:
353
            self._elapsed_new = 0.0
354
            for thread in threadlist:
355
                begin_time = time()
356
                thread.join()
357
                self._elapsed_new += time() - begin_time
358
            self._elapsed_new = self._elapsed_new / len(threadlist)
359
            return []
360
        return threadlist
361

    
362
    def _raise_for_status(self, r):
363
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
364
        status_msg = getattr(r, 'status', None) or ''
365
        try:
366
            message = '%s %s\n' % (status_msg, r.text)
367
        except:
368
            message = '%s %s\n' % (status_msg, r)
369
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
370
        raise ClientError(message, status=status)
371

    
372
    def set_header(self, name, value, iff=True):
373
        """Set a header 'name':'value'"""
374
        if value is not None and iff:
375
            self.headers[name] = unicode(value)
376

    
377
    def set_param(self, name, value=None, iff=True):
378
        if iff:
379
            self.params[name] = unicode(value)
380

    
381
    def request(
382
            self, method, path,
383
            async_headers=dict(), async_params=dict(),
384
            **kwargs):
385
        """Commit an HTTP request to base_url/path
386
        Requests are commited to and performed by Request/ResponseManager
387
        These classes perform a lazy http request. Present method, by default,
388
        enforces them to perform the http call. Hint: call present method with
389
        success=None to get a non-performed ResponseManager object.
390
        """
391
        assert isinstance(method, str) or isinstance(method, unicode)
392
        assert method
393
        assert isinstance(path, str) or isinstance(path, unicode)
394
        try:
395
            headers = dict(self.headers)
396
            headers.update(async_headers)
397
            params = dict(self.params)
398
            params.update(async_params)
399
            success = kwargs.pop('success', 200)
400
            data = kwargs.pop('data', None)
401
            headers.setdefault('X-Auth-Token', self.token)
402
            if 'json' in kwargs:
403
                data = dumps(kwargs.pop('json'))
404
                headers.setdefault('Content-Type', 'application/json')
405
            if data:
406
                headers.setdefault('Content-Length', '%s' % len(data))
407

    
408
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
409
            req = RequestManager(
410
                method, self.base_url, path,
411
                data=data, headers=headers, params=params)
412
            #  req.log()
413
            r = ResponseManager(
414
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
415
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
416
        finally:
417
            self.headers = dict()
418
            self.params = dict()
419

    
420
        if success is not None:
421
            # Success can either be an int or a collection
422
            success = (success,) if isinstance(success, int) else success
423
            if r.status_code not in success:
424
                self._raise_for_status(r)
425
        return r
426

    
427
    def delete(self, path, **kwargs):
428
        return self.request('delete', path, **kwargs)
429

    
430
    def get(self, path, **kwargs):
431
        return self.request('get', path, **kwargs)
432

    
433
    def head(self, path, **kwargs):
434
        return self.request('head', path, **kwargs)
435

    
436
    def post(self, path, **kwargs):
437
        return self.request('post', path, **kwargs)
438

    
439
    def put(self, path, **kwargs):
440
        return self.request('put', path, **kwargs)
441

    
442
    def copy(self, path, **kwargs):
443
        return self.request('copy', path, **kwargs)
444

    
445
    def move(self, path, **kwargs):
446
        return self.request('move', path, **kwargs)