Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 01d14153

History | View | Annotate | Download (15.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103

    
104

    
105
class RequestManager(Logged):
106
    """Handle http request information"""
107

    
108
    def _connection_info(self, url, path, params={}):
109
        """ Set self.url to scheme://netloc/?params
110
        :param url: (str or unicode) The service url
111

112
        :param path: (str or unicode) The service path (url/path)
113

114
        :param params: (dict) Parameters to add to final url
115

116
        :returns: (scheme, netloc)
117
        """
118
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
119
        url += '' if url.endswith('/') else '/'
120
        if path:
121
            url += _encode(path[1:] if path.startswith('/') else path)
122
        delim = '?'
123
        for key, val in params.items():
124
            val = _encode(val)
125
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
126
            delim = '&'
127
        parsed = urlparse(url)
128
        self.url = url
129
        self.path = parsed.path or '/'
130
        if parsed.query:
131
            self.path += '?%s' % parsed.query
132
        return (parsed.scheme, parsed.netloc)
133

    
134
    def __init__(
135
            self, method, url, path,
136
            data=None, headers={}, params={}):
137
        method = method.upper()
138
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
139
        if headers:
140
            assert isinstance(headers, dict)
141
        self.headers = dict(headers)
142
        self.method, self.data = method, data
143
        self.scheme, self.netloc = self._connection_info(url, path, params)
144

    
145
    def dump_log(self):
146
        sendlog.info('%s %s://%s%s\t[%s]' % (
147
            self.method, self.scheme, self.netloc, self.path, self))
148
        for key, val in self.headers.items():
149
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
150
                continue
151
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
152
        if self.data:
153
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
154
            if self.LOG_DATA:
155
                sendlog.info(self.data)
156
        else:
157
            sendlog.info('data size:0\t[%s]' % self)
158
        sendlog.info('')
159

    
160
    def perform(self, conn):
161
        """
162
        :param conn: (httplib connection object)
163

164
        :returns: (HTTPResponse)
165
        """
166
        conn.request(
167
            method=str(self.method.upper()),
168
            url=str(self.path),
169
            headers=self.headers,
170
            body=self.data)
171
        self.dump_log()
172
        keep_trying = TIMEOUT
173
        while keep_trying > 0:
174
            try:
175
                return conn.getresponse()
176
            except ResponseNotReady:
177
                wait = 0.03 * random()
178
                sleep(wait)
179
                keep_trying -= wait
180
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
181
        recvlog.debug(logmsg)
182
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
183

    
184

    
185
class ResponseManager(Logged):
186
    """Manage the http request and handle the response data, headers, etc."""
187

    
188
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
189
        """
190
        :param request: (RequestManager)
191

192
        :param poolsize: (int) the size of the connection pool
193

194
        :param connection_retry_limit: (int)
195
        """
196
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
197
        self.request = request
198
        self._request_performed = False
199
        self.poolsize = poolsize
200

    
201
    def _get_response(self):
202
        if self._request_performed:
203
            return
204

    
205
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
206
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
207
            try:
208
                with PooledHTTPConnection(
209
                        self.request.netloc, self.request.scheme,
210
                        **pool_kw) as connection:
211
                    self.request.LOG_TOKEN = self.LOG_TOKEN
212
                    self.request.LOG_DATA = self.LOG_DATA
213
                    r = self.request.perform(connection)
214
                    recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
215
                        self, r, self.request))
216
                    self._request_performed = True
217
                    self._status_code, self._status = r.status, unquote(
218
                        r.reason)
219
                    recvlog.info(
220
                        '%d %s\t[p: %s]' % (
221
                            self.status_code, self.status, self))
222
                    self._headers = dict()
223
                    for k, v in r.getheaders():
224
                        if (not self.LOG_TOKEN) and (
225
                                k.lower() == 'x-auth-token'):
226
                            continue
227
                        v = unquote(v)
228
                        self._headers[k] = v
229
                        recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
230
                    self._content = r.read()
231
                    recvlog.info('data size: %s\t[p: %s]' % (
232
                        len(self._content) if self._content else 0,
233
                        self))
234
                    if self.LOG_DATA and self._content:
235
                        recvlog.info('%s\t[p: %s]' % (self._content, self))
236
                break
237
            except Exception as err:
238
                if isinstance(err, HTTPException):
239
                    if retries >= self.CONNECTION_TRY_LIMIT:
240
                        raise ClientError(
241
                            'Connection to %s failed %s times (%s: %s )' % (
242
                                self.request.url, retries, type(err), err))
243
                else:
244
                    from traceback import format_stack
245
                    recvlog.debug(
246
                        '\n'.join(['%s' % type(err)] + format_stack()))
247
                    raise ClientError(
248
                        'Failed while http-connecting to %s (%s)' % (
249
                            self.request.url,
250
                            err))
251

    
252
    @property
253
    def status_code(self):
254
        self._get_response()
255
        return self._status_code
256

    
257
    @property
258
    def status(self):
259
        self._get_response()
260
        return self._status
261

    
262
    @property
263
    def headers(self):
264
        self._get_response()
265
        return self._headers
266

    
267
    @property
268
    def content(self):
269
        self._get_response()
270
        return self._content
271

    
272
    @property
273
    def text(self):
274
        """
275
        :returns: (str) content
276
        """
277
        self._get_response()
278
        return '%s' % self._content
279

    
280
    @property
281
    def json(self):
282
        """
283
        :returns: (dict) squeezed from json-formated content
284
        """
285
        self._get_response()
286
        try:
287
            return loads(self._content)
288
        except ValueError as err:
289
            raise ClientError('Response not formated in JSON - %s' % err)
290

    
291

    
292
class SilentEvent(Thread):
293
    """Thread-run method(*args, **kwargs)"""
294
    def __init__(self, method, *args, **kwargs):
295
        super(self.__class__, self).__init__()
296
        self.method = method
297
        self.args = args
298
        self.kwargs = kwargs
299

    
300
    @property
301
    def exception(self):
302
        return getattr(self, '_exception', False)
303

    
304
    @property
305
    def value(self):
306
        return getattr(self, '_value', None)
307

    
308
    def run(self):
309
        try:
310
            self._value = self.method(*(self.args), **(self.kwargs))
311
        except Exception as e:
312
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
313
                self,
314
                type(e),
315
                e.status if isinstance(e, ClientError) else '',
316
                e))
317
            self._exception = e
318

    
319

    
320
class Client(object):
321

    
322
    MAX_THREADS = 7
323
    DATE_FORMATS = [
324
        '%a %b %d %H:%M:%S %Y',
325
        '%A, %d-%b-%y %H:%M:%S GMT',
326
        '%a, %d %b %Y %H:%M:%S GMT']
327
    LOG_TOKEN = False
328
    LOG_DATA = False
329
    CONNECTION_RETRY_LIMIT = 0
330

    
331
    def __init__(self, base_url, token):
332
        assert base_url, 'No base_url for client %s' % self
333
        self.base_url = base_url
334
        self.token = token
335
        self.headers, self.params = dict(), dict()
336

    
337
    def _init_thread_limit(self, limit=1):
338
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
339
        self._thread_limit = limit
340
        self._elapsed_old = 0.0
341
        self._elapsed_new = 0.0
342

    
343
    def _watch_thread_limit(self, threadlist):
344
        self._thread_limit = getattr(self, '_thread_limit', 1)
345
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
346
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
347
        recvlog.debug('# running threads: %s' % len(threadlist))
348
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
349
                self._thread_limit < self.MAX_THREADS):
350
            self._thread_limit += 1
351
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
352
            self._thread_limit -= 1
353

    
354
        self._elapsed_old = self._elapsed_new
355
        if len(threadlist) >= self._thread_limit:
356
            self._elapsed_new = 0.0
357
            for thread in threadlist:
358
                begin_time = time()
359
                thread.join()
360
                self._elapsed_new += time() - begin_time
361
            self._elapsed_new = self._elapsed_new / len(threadlist)
362
            return []
363
        return threadlist
364

    
365
    def _raise_for_status(self, r):
366
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
367
        status_msg = getattr(r, 'status', None) or ''
368
        try:
369
            message = '%s %s\n' % (status_msg, r.text)
370
        except:
371
            message = '%s %s\n' % (status_msg, r)
372
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
373
        raise ClientError(message, status=status)
374

    
375
    def set_header(self, name, value, iff=True):
376
        """Set a header 'name':'value'"""
377
        if value is not None and iff:
378
            self.headers[name] = unicode(value)
379

    
380
    def set_param(self, name, value=None, iff=True):
381
        if iff:
382
            self.params[name] = unicode(value)
383

    
384
    def request(
385
            self, method, path,
386
            async_headers=dict(), async_params=dict(),
387
            **kwargs):
388
        """Commit an HTTP request to base_url/path
389
        Requests are commited to and performed by Request/ResponseManager
390
        These classes perform a lazy http request. Present method, by default,
391
        enforces them to perform the http call. Hint: call present method with
392
        success=None to get a non-performed ResponseManager object.
393
        """
394
        assert isinstance(method, str) or isinstance(method, unicode)
395
        assert method
396
        assert isinstance(path, str) or isinstance(path, unicode)
397
        try:
398
            headers = dict(self.headers)
399
            headers.update(async_headers)
400
            params = dict(self.params)
401
            params.update(async_params)
402
            success = kwargs.pop('success', 200)
403
            data = kwargs.pop('data', None)
404
            headers.setdefault('X-Auth-Token', self.token)
405
            if 'json' in kwargs:
406
                data = dumps(kwargs.pop('json'))
407
                headers.setdefault('Content-Type', 'application/json')
408
            if data:
409
                headers.setdefault('Content-Length', '%s' % len(data))
410

    
411
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
412
            req = RequestManager(
413
                method, self.base_url, path,
414
                data=data, headers=headers, params=params)
415
            #  req.log()
416
            r = ResponseManager(
417
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
418
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
419
        finally:
420
            self.headers = dict()
421
            self.params = dict()
422

    
423
        if success is not None:
424
            # Success can either be an int or a collection
425
            success = (success,) if isinstance(success, int) else success
426
            if r.status_code not in success:
427
                self._raise_for_status(r)
428
        return r
429

    
430
    def delete(self, path, **kwargs):
431
        return self.request('delete', path, **kwargs)
432

    
433
    def get(self, path, **kwargs):
434
        return self.request('get', path, **kwargs)
435

    
436
    def head(self, path, **kwargs):
437
        return self.request('head', path, **kwargs)
438

    
439
    def post(self, path, **kwargs):
440
        return self.request('post', path, **kwargs)
441

    
442
    def put(self, path, **kwargs):
443
        return self.request('put', path, **kwargs)
444

    
445
    def copy(self, path, **kwargs):
446
        return self.request('copy', path, **kwargs)
447

    
448
    def move(self, path, **kwargs):
449
        return self.request('move', path, **kwargs)