Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ a8262d20

History | View | Annotate | Download (16.2 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103
    LOG_PID = False
104

    
105

    
106
class RequestManager(Logged):
107
    """Handle http request information"""
108

    
109
    def _connection_info(self, url, path, params={}):
110
        """ Set self.url to scheme://netloc/?params
111
        :param url: (str or unicode) The service url
112

113
        :param path: (str or unicode) The service path (url/path)
114

115
        :param params: (dict) Parameters to add to final url
116

117
        :returns: (scheme, netloc)
118
        """
119
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
120
        url += '' if url.endswith('/') else '/'
121
        if path:
122
            url += _encode(path[1:] if path.startswith('/') else path)
123
        delim = '?'
124
        for key, val in params.items():
125
            val = _encode(u'%s' % val)
126
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
127
            delim = '&'
128
        parsed = urlparse(url)
129
        self.url = url
130
        self.path = parsed.path or '/'
131
        if parsed.query:
132
            self.path += '?%s' % parsed.query
133
        return (parsed.scheme, parsed.netloc)
134

    
135
    def __init__(
136
            self, method, url, path,
137
            data=None, headers={}, params={}):
138
        method = method.upper()
139
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
140
        if headers:
141
            assert isinstance(headers, dict)
142
        self.headers = dict(headers)
143
        self.method, self.data = method, data
144
        self.scheme, self.netloc = self._connection_info(url, path, params)
145

    
146
    def dump_log(self):
147
        plog = '\t[%s]' if self.LOG_PID else ''
148
        sendlog.info('- -  -   -     -        -             -')
149
        sendlog.info('%s %s://%s%s%s' % (
150
            self.method, self.scheme, self.netloc, self.path, plog))
151
        for key, val in self.headers.items():
152
            #if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
153
            #    continue
154
            show = (key.lower() != 'x-auth-token') or self.LOG_TOKEN
155
            sendlog.info('  %s: %s%s' % (key, val if show else '', plog))
156
        if self.data:
157
            sendlog.info('data size:%s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data)
160
        else:
161
            sendlog.info('data size:0%s' % plog)
162
        sendlog.info('')
163

    
164
    def perform(self, conn):
165
        """
166
        :param conn: (httplib connection object)
167

168
        :returns: (HTTPResponse)
169
        """
170
        conn.request(
171
            method=str(self.method.upper()),
172
            url=str(self.path),
173
            headers=self.headers,
174
            body=self.data)
175
        self.dump_log()
176
        keep_trying = TIMEOUT
177
        while keep_trying > 0:
178
            try:
179
                return conn.getresponse()
180
            except ResponseNotReady:
181
                wait = 0.03 * random()
182
                sleep(wait)
183
                keep_trying -= wait
184
        plog = '\t[%s]' if self.LOG_PID else ''
185
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
186
        recvlog.debug(logmsg)
187
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
188

    
189

    
190
class ResponseManager(Logged):
191
    """Manage the http request and handle the response data, headers, etc."""
192

    
193
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
194
        """
195
        :param request: (RequestManager)
196

197
        :param poolsize: (int) the size of the connection pool
198

199
        :param connection_retry_limit: (int)
200
        """
201
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
202
        self.request = request
203
        self._request_performed = False
204
        self.poolsize = poolsize
205

    
206
    def _get_response(self):
207
        if self._request_performed:
208
            return
209

    
210
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
211
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
212
            try:
213
                with PooledHTTPConnection(
214
                        self.request.netloc, self.request.scheme,
215
                        **pool_kw) as connection:
216
                    self.request.LOG_TOKEN = self.LOG_TOKEN
217
                    self.request.LOG_DATA = self.LOG_DATA
218
                    self.request.LOG_PID = self.LOG_PID
219
                    r = self.request.perform(connection)
220
                    plog = ''
221
                    if self.LOG_PID:
222
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
223
                            self, r, self.request))
224
                        plog = '\t[%s]' % self
225
                    self._request_performed = True
226
                    self._status_code, self._status = r.status, unquote(
227
                        r.reason)
228
                    recvlog.info(
229
                        '%d %s%s' % (
230
                            self.status_code, self.status, plog))
231
                    self._headers = dict()
232
                    for k, v in r.getheaders():
233
                        if (not self.LOG_TOKEN) and (
234
                                k.lower() == 'x-auth-token'):
235
                            continue
236
                        v = unquote(v)
237
                        self._headers[k] = v
238
                        recvlog.info('  %s: %s%s' % (k, v, plog))
239
                    self._content = r.read()
240
                    recvlog.info('data size: %s%s' % (
241
                        len(self._content) if self._content else 0, plog))
242
                    if self.LOG_DATA and self._content:
243
                        recvlog.info('%s%s' % (self._content, plog))
244
                    sendlog.info('-             -        -     -   -  - -')
245
                break
246
            except Exception as err:
247
                if isinstance(err, HTTPException):
248
                    if retries >= self.CONNECTION_TRY_LIMIT:
249
                        raise ClientError(
250
                            'Connection to %s failed %s times (%s: %s )' % (
251
                                self.request.url, retries, type(err), err))
252
                else:
253
                    from traceback import format_stack
254
                    recvlog.debug(
255
                        '\n'.join(['%s' % type(err)] + format_stack()))
256
                    raise ClientError(
257
                        'Failed while http-connecting to %s (%s)' % (
258
                            self.request.url, err))
259

    
260
    @property
261
    def status_code(self):
262
        self._get_response()
263
        return self._status_code
264

    
265
    @property
266
    def status(self):
267
        self._get_response()
268
        return self._status
269

    
270
    @property
271
    def headers(self):
272
        self._get_response()
273
        return self._headers
274

    
275
    @property
276
    def content(self):
277
        self._get_response()
278
        return self._content
279

    
280
    @property
281
    def text(self):
282
        """
283
        :returns: (str) content
284
        """
285
        self._get_response()
286
        return '%s' % self._content
287

    
288
    @property
289
    def json(self):
290
        """
291
        :returns: (dict) squeezed from json-formated content
292
        """
293
        self._get_response()
294
        try:
295
            return loads(self._content)
296
        except ValueError as err:
297
            raise ClientError('Response not formated in JSON - %s' % err)
298

    
299

    
300
class SilentEvent(Thread):
301
    """Thread-run method(*args, **kwargs)"""
302
    def __init__(self, method, *args, **kwargs):
303
        super(self.__class__, self).__init__()
304
        self.method = method
305
        self.args = args
306
        self.kwargs = kwargs
307

    
308
    @property
309
    def exception(self):
310
        return getattr(self, '_exception', False)
311

    
312
    @property
313
    def value(self):
314
        return getattr(self, '_value', None)
315

    
316
    def run(self):
317
        try:
318
            self._value = self.method(*(self.args), **(self.kwargs))
319
        except Exception as e:
320
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
321
                self,
322
                type(e),
323
                e.status if isinstance(e, ClientError) else '',
324
                e))
325
            self._exception = e
326

    
327

    
328
class Client(Logged):
329

    
330
    MAX_THREADS = 7
331
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
332
    CONNECTION_RETRY_LIMIT = 0
333

    
334
    def __init__(self, base_url, token):
335
        assert base_url, 'No base_url for client %s' % self
336
        self.base_url = base_url
337
        self.token = token
338
        self.headers, self.params = dict(), dict()
339

    
340
    def _init_thread_limit(self, limit=1):
341
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
342
        self._thread_limit = limit
343
        self._elapsed_old = 0.0
344
        self._elapsed_new = 0.0
345

    
346
    def _watch_thread_limit(self, threadlist):
347
        self._thread_limit = getattr(self, '_thread_limit', 1)
348
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
349
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
350
        recvlog.debug('# running threads: %s' % len(threadlist))
351
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
352
                self._thread_limit < self.MAX_THREADS):
353
            self._thread_limit += 1
354
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
355
            self._thread_limit -= 1
356

    
357
        self._elapsed_old = self._elapsed_new
358
        if len(threadlist) >= self._thread_limit:
359
            self._elapsed_new = 0.0
360
            for thread in threadlist:
361
                begin_time = time()
362
                thread.join()
363
                self._elapsed_new += time() - begin_time
364
            self._elapsed_new = self._elapsed_new / len(threadlist)
365
            return []
366
        return threadlist
367

    
368
    def _raise_for_status(self, r):
369
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
370
        status_msg = getattr(r, 'status', None) or ''
371
        try:
372
            message = '%s %s\n' % (status_msg, r.text)
373
        except:
374
            message = '%s %s\n' % (status_msg, r)
375
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
376
        raise ClientError(message, status=status)
377

    
378
    def set_header(self, name, value, iff=True):
379
        """Set a header 'name':'value'"""
380
        if value is not None and iff:
381
            self.headers[name] = unicode(value)
382

    
383
    def set_param(self, name, value=None, iff=True):
384
        if iff:
385
            self.params[name] = unicode(value)
386

    
387
    def request(
388
            self, method, path,
389
            async_headers=dict(), async_params=dict(),
390
            **kwargs):
391
        """Commit an HTTP request to base_url/path
392
        Requests are commited to and performed by Request/ResponseManager
393
        These classes perform a lazy http request. Present method, by default,
394
        enforces them to perform the http call. Hint: call present method with
395
        success=None to get a non-performed ResponseManager object.
396
        """
397
        assert isinstance(method, str) or isinstance(method, unicode)
398
        assert method
399
        assert isinstance(path, str) or isinstance(path, unicode)
400
        try:
401
            headers = dict(self.headers)
402
            headers.update(async_headers)
403
            params = dict(self.params)
404
            params.update(async_params)
405
            success = kwargs.pop('success', 200)
406
            data = kwargs.pop('data', None)
407
            headers.setdefault('X-Auth-Token', self.token)
408
            if 'json' in kwargs:
409
                data = dumps(kwargs.pop('json'))
410
                headers.setdefault('Content-Type', 'application/json')
411
            if data:
412
                headers.setdefault('Content-Length', '%s' % len(data))
413

    
414
            plog = '\t[%s]' if self.LOG_PID else ''
415
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
416
            req = RequestManager(
417
                method, self.base_url, path,
418
                data=data, headers=headers, params=params)
419
            #  req.log()
420
            r = ResponseManager(
421
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
422
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
423
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
424
        finally:
425
            self.headers = dict()
426
            self.params = dict()
427

    
428
        if success is not None:
429
            # Success can either be an int or a collection
430
            success = (success,) if isinstance(success, int) else success
431
            if r.status_code not in success:
432
                self._raise_for_status(r)
433
        return r
434

    
435
    def delete(self, path, **kwargs):
436
        return self.request('delete', path, **kwargs)
437

    
438
    def get(self, path, **kwargs):
439
        return self.request('get', path, **kwargs)
440

    
441
    def head(self, path, **kwargs):
442
        return self.request('head', path, **kwargs)
443

    
444
    def post(self, path, **kwargs):
445
        return self.request('post', path, **kwargs)
446

    
447
    def put(self, path, **kwargs):
448
        return self.request('put', path, **kwargs)
449

    
450
    def copy(self, path, **kwargs):
451
        return self.request('copy', path, **kwargs)
452

    
453
    def move(self, path, **kwargs):
454
        return self.request('move', path, **kwargs)