Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 38db356b

History | View | Annotate | Download (16.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103
    LOG_PID = False
104

    
105

    
106
class RequestManager(Logged):
107
    """Handle http request information"""
108

    
109
    def _connection_info(self, url, path, params={}):
110
        """ Set self.url to scheme://netloc/?params
111
        :param url: (str or unicode) The service url
112

113
        :param path: (str or unicode) The service path (url/path)
114

115
        :param params: (dict) Parameters to add to final url
116

117
        :returns: (scheme, netloc)
118
        """
119
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
120
        url += '' if url.endswith('/') else '/'
121
        if path:
122
            url += _encode(path[1:] if path.startswith('/') else path)
123
        delim = '?'
124
        for key, val in params.items():
125
            val = _encode(u'%s' % val)
126
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
127
            delim = '&'
128
        parsed = urlparse(url)
129
        self.url = url
130
        self.path = parsed.path or '/'
131
        if parsed.query:
132
            self.path += '?%s' % parsed.query
133
        return (parsed.scheme, parsed.netloc)
134

    
135
    def __init__(
136
            self, method, url, path,
137
            data=None, headers={}, params={}):
138
        method = method.upper()
139
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
140
        if headers:
141
            assert isinstance(headers, dict)
142
        self.headers = dict(headers)
143
        self.method, self.data = method, data
144
        self.scheme, self.netloc = self._connection_info(url, path, params)
145

    
146
    def dump_log(self):
147
        plog = '\t[%s]' if self.LOG_PID else ''
148
        sendlog.info('- -  -   -     -        -             -')
149
        sendlog.info('%s %s://%s%s%s' % (
150
            self.method, self.scheme, self.netloc, self.path, plog))
151
        for key, val in self.headers.items():
152
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
153
                continue
154
            sendlog.info('  %s: %s%s' % (key, val, plog))
155
        if self.data:
156
            sendlog.info('data size:%s%s' % (len(self.data), plog))
157
            if self.LOG_DATA:
158
                sendlog.info(self.data)
159
        else:
160
            sendlog.info('data size:0%s' % plog)
161
        sendlog.info('')
162

    
163
    def perform(self, conn):
164
        """
165
        :param conn: (httplib connection object)
166

167
        :returns: (HTTPResponse)
168
        """
169
        conn.request(
170
            method=str(self.method.upper()),
171
            url=str(self.path),
172
            headers=self.headers,
173
            body=self.data)
174
        self.dump_log()
175
        keep_trying = TIMEOUT
176
        while keep_trying > 0:
177
            try:
178
                return conn.getresponse()
179
            except ResponseNotReady:
180
                wait = 0.03 * random()
181
                sleep(wait)
182
                keep_trying -= wait
183
        plog = '\t[%s]' if self.LOG_PID else ''
184
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
185
        recvlog.debug(logmsg)
186
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
187

    
188

    
189
class ResponseManager(Logged):
190
    """Manage the http request and handle the response data, headers, etc."""
191

    
192
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
193
        """
194
        :param request: (RequestManager)
195

196
        :param poolsize: (int) the size of the connection pool
197

198
        :param connection_retry_limit: (int)
199
        """
200
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
201
        self.request = request
202
        self._request_performed = False
203
        self.poolsize = poolsize
204

    
205
    def _get_response(self):
206
        if self._request_performed:
207
            return
208

    
209
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
210
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
211
            try:
212
                with PooledHTTPConnection(
213
                        self.request.netloc, self.request.scheme,
214
                        **pool_kw) as connection:
215
                    self.request.LOG_TOKEN = self.LOG_TOKEN
216
                    self.request.LOG_DATA = self.LOG_DATA
217
                    self.request.LOG_PID = self.LOG_PID
218
                    r = self.request.perform(connection)
219
                    plog = ''
220
                    if self.LOG_PID:
221
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
222
                            self, r, self.request))
223
                        plog = '\t[%s]' % self
224
                    self._request_performed = True
225
                    self._status_code, self._status = r.status, unquote(
226
                        r.reason)
227
                    recvlog.info(
228
                        '%d %s%s' % (
229
                            self.status_code, self.status, plog))
230
                    self._headers = dict()
231
                    for k, v in r.getheaders():
232
                        if (not self.LOG_TOKEN) and (
233
                                k.lower() == 'x-auth-token'):
234
                            continue
235
                        v = unquote(v)
236
                        self._headers[k] = v
237
                        recvlog.info('  %s: %s%s' % (k, v, plog))
238
                    self._content = r.read()
239
                    recvlog.info('data size: %s%s' % (
240
                        len(self._content) if self._content else 0, plog))
241
                    if self.LOG_DATA and self._content:
242
                        recvlog.info('%s%s' % (self._content, plog))
243
                    sendlog.info('-             -        -     -   -  - -')
244
                break
245
            except Exception as err:
246
                if isinstance(err, HTTPException):
247
                    if retries >= self.CONNECTION_TRY_LIMIT:
248
                        raise ClientError(
249
                            'Connection to %s failed %s times (%s: %s )' % (
250
                                self.request.url, retries, type(err), err))
251
                else:
252
                    from traceback import format_stack
253
                    recvlog.debug(
254
                        '\n'.join(['%s' % type(err)] + format_stack()))
255
                    raise ClientError(
256
                        'Failed while http-connecting to %s (%s)' % (
257
                            self.request.url, err))
258

    
259
    @property
260
    def status_code(self):
261
        self._get_response()
262
        return self._status_code
263

    
264
    @property
265
    def status(self):
266
        self._get_response()
267
        return self._status
268

    
269
    @property
270
    def headers(self):
271
        self._get_response()
272
        return self._headers
273

    
274
    @property
275
    def content(self):
276
        self._get_response()
277
        return self._content
278

    
279
    @property
280
    def text(self):
281
        """
282
        :returns: (str) content
283
        """
284
        self._get_response()
285
        return '%s' % self._content
286

    
287
    @property
288
    def json(self):
289
        """
290
        :returns: (dict) squeezed from json-formated content
291
        """
292
        self._get_response()
293
        try:
294
            return loads(self._content)
295
        except ValueError as err:
296
            raise ClientError('Response not formated in JSON - %s' % err)
297

    
298

    
299
class SilentEvent(Thread):
300
    """Thread-run method(*args, **kwargs)"""
301
    def __init__(self, method, *args, **kwargs):
302
        super(self.__class__, self).__init__()
303
        self.method = method
304
        self.args = args
305
        self.kwargs = kwargs
306

    
307
    @property
308
    def exception(self):
309
        return getattr(self, '_exception', False)
310

    
311
    @property
312
    def value(self):
313
        return getattr(self, '_value', None)
314

    
315
    def run(self):
316
        try:
317
            self._value = self.method(*(self.args), **(self.kwargs))
318
        except Exception as e:
319
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
320
                self,
321
                type(e),
322
                e.status if isinstance(e, ClientError) else '',
323
                e))
324
            self._exception = e
325

    
326

    
327
class Client(Logged):
328

    
329
    MAX_THREADS = 7
330
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
331
    CONNECTION_RETRY_LIMIT = 0
332

    
333
    def __init__(self, base_url, token):
334
        assert base_url, 'No base_url for client %s' % self
335
        self.base_url = base_url
336
        self.token = token
337
        self.headers, self.params = dict(), dict()
338

    
339
    def _init_thread_limit(self, limit=1):
340
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
341
        self._thread_limit = limit
342
        self._elapsed_old = 0.0
343
        self._elapsed_new = 0.0
344

    
345
    def _watch_thread_limit(self, threadlist):
346
        self._thread_limit = getattr(self, '_thread_limit', 1)
347
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
348
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
349
        recvlog.debug('# running threads: %s' % len(threadlist))
350
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
351
                self._thread_limit < self.MAX_THREADS):
352
            self._thread_limit += 1
353
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
354
            self._thread_limit -= 1
355

    
356
        self._elapsed_old = self._elapsed_new
357
        if len(threadlist) >= self._thread_limit:
358
            self._elapsed_new = 0.0
359
            for thread in threadlist:
360
                begin_time = time()
361
                thread.join()
362
                self._elapsed_new += time() - begin_time
363
            self._elapsed_new = self._elapsed_new / len(threadlist)
364
            return []
365
        return threadlist
366

    
367
    def _raise_for_status(self, r):
368
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
369
        status_msg = getattr(r, 'status', None) or ''
370
        try:
371
            message = '%s %s\n' % (status_msg, r.text)
372
        except:
373
            message = '%s %s\n' % (status_msg, r)
374
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
375
        raise ClientError(message, status=status)
376

    
377
    def set_header(self, name, value, iff=True):
378
        """Set a header 'name':'value'"""
379
        if value is not None and iff:
380
            self.headers[name] = unicode(value)
381

    
382
    def set_param(self, name, value=None, iff=True):
383
        if iff:
384
            self.params[name] = unicode(value)
385

    
386
    def request(
387
            self, method, path,
388
            async_headers=dict(), async_params=dict(),
389
            **kwargs):
390
        """Commit an HTTP request to base_url/path
391
        Requests are commited to and performed by Request/ResponseManager
392
        These classes perform a lazy http request. Present method, by default,
393
        enforces them to perform the http call. Hint: call present method with
394
        success=None to get a non-performed ResponseManager object.
395
        """
396
        assert isinstance(method, str) or isinstance(method, unicode)
397
        assert method
398
        assert isinstance(path, str) or isinstance(path, unicode)
399
        try:
400
            headers = dict(self.headers)
401
            headers.update(async_headers)
402
            params = dict(self.params)
403
            params.update(async_params)
404
            success = kwargs.pop('success', 200)
405
            data = kwargs.pop('data', None)
406
            headers.setdefault('X-Auth-Token', self.token)
407
            if 'json' in kwargs:
408
                data = dumps(kwargs.pop('json'))
409
                headers.setdefault('Content-Type', 'application/json')
410
            if data:
411
                headers.setdefault('Content-Length', '%s' % len(data))
412

    
413
            plog = '\t[%s]' if self.LOG_PID else ''
414
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
415
            req = RequestManager(
416
                method, self.base_url, path,
417
                data=data, headers=headers, params=params)
418
            #  req.log()
419
            r = ResponseManager(
420
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
421
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
422
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
423
        finally:
424
            self.headers = dict()
425
            self.params = dict()
426

    
427
        if success is not None:
428
            # Success can either be an int or a collection
429
            success = (success,) if isinstance(success, int) else success
430
            if r.status_code not in success:
431
                self._raise_for_status(r)
432
        return r
433

    
434
    def delete(self, path, **kwargs):
435
        return self.request('delete', path, **kwargs)
436

    
437
    def get(self, path, **kwargs):
438
        return self.request('get', path, **kwargs)
439

    
440
    def head(self, path, **kwargs):
441
        return self.request('head', path, **kwargs)
442

    
443
    def post(self, path, **kwargs):
444
        return self.request('post', path, **kwargs)
445

    
446
    def put(self, path, **kwargs):
447
        return self.request('put', path, **kwargs)
448

    
449
    def copy(self, path, **kwargs):
450
        return self.request('copy', path, **kwargs)
451

    
452
    def move(self, path, **kwargs):
453
        return self.request('move', path, **kwargs)