Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 9dc724e5

History | View | Annotate | Download (14.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady
40
from time import sleep
41
from random import random
42

    
43
from objpool.http import PooledHTTPConnection
44

    
45
from kamaki.logger import get_logger
46

    
47

    
48
TIMEOUT = 60.0   # seconds
49
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50

    
51
log = get_logger(__name__)
52
sendlog = get_logger('%s.send' % __name__)
53
recvlog = get_logger('%s.recv' % __name__)
54

    
55

    
56
def _encode(v):
57
    if v and isinstance(v, unicode):
58
        return quote(v.encode('utf-8'))
59
    return v
60

    
61

    
62
class ClientError(Exception):
63
    def __init__(self, message, status=0, details=None):
64
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
65
            message,
66
            status,
67
            details))
68
        try:
69
            message += '' if message and message[-1] == '\n' else '\n'
70
            serv_stat, sep, new_msg = message.partition('{')
71
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
72
            json_msg = loads(new_msg)
73
            key = json_msg.keys()[0]
74
            serv_stat = serv_stat.strip()
75

    
76
            json_msg = json_msg[key]
77
            message = '%s %s (%s)\n' % (
78
                serv_stat,
79
                key,
80
                json_msg['message']) if (
81
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
82
            status = json_msg.get('code', status)
83
            if 'details' in json_msg:
84
                if not details:
85
                    details = []
86
                if not isinstance(details, list):
87
                    details = [details]
88
                if json_msg['details']:
89
                    details.append(json_msg['details'])
90
        except Exception:
91
            pass
92
        finally:
93
            while message.endswith('\n\n'):
94
                message = message[:-1]
95
            super(ClientError, self).__init__(message)
96
            self.status = status if isinstance(status, int) else 0
97
            self.details = details if details else []
98

    
99

    
100
class Logged(object):
101

    
102
    LOG_TOKEN = False
103
    LOG_DATA = False
104

    
105

    
106
class RequestManager(Logged):
107
    """Handle http request information"""
108

    
109
    def _connection_info(self, url, path, params={}):
110
        """ Set self.url to scheme://netloc/?params
111
        :param url: (str or unicode) The service url
112

113
        :param path: (str or unicode) The service path (url/path)
114

115
        :param params: (dict) Parameters to add to final url
116

117
        :returns: (scheme, netloc)
118
        """
119
        url = _encode(url) if url else 'http://127.0.0.1/'
120
        url += '' if url.endswith('/') else '/'
121
        if path:
122
            url += _encode(path[1:] if path.startswith('/') else path)
123
        delim = '?'
124
        for key, val in params.items():
125
            val = _encode(val)
126
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
127
            delim = '&'
128
        parsed = urlparse(url)
129
        self.url = url
130
        self.path = parsed.path or '/'
131
        if parsed.query:
132
            self.path += '?%s' % parsed.query
133
        return (parsed.scheme, parsed.netloc)
134

    
135
    def __init__(
136
            self, method, url, path,
137
            data=None, headers={}, params={}):
138
        method = method.upper()
139
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
140
        if headers:
141
            assert isinstance(headers, dict)
142
        self.headers = dict(headers)
143
        self.method, self.data = method, data
144
        self.scheme, self.netloc = self._connection_info(url, path, params)
145

    
146
    def dump_log(self):
147
        sendlog.info('%s %s://%s%s\t[%s]' % (
148
            self.method,
149
            self.scheme,
150
            self.netloc,
151
            self.path,
152
            self))
153
        for key, val in self.headers.items():
154
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
155
                continue
156
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
157
        if self.data:
158
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
159
            if self.LOG_DATA:
160
                sendlog.info(self.data)
161
        else:
162
            sendlog.info('data size:0\t[%s]' % self)
163
        sendlog.info('')
164

    
165
    def perform(self, conn):
166
        """
167
        :param conn: (httplib connection object)
168

169
        :returns: (HTTPResponse)
170
        """
171
        conn.request(
172
            method=str(self.method.upper()),
173
            url=str(self.path),
174
            headers=self.headers,
175
            body=self.data)
176
        self.dump_log()
177
        keep_trying = TIMEOUT
178
        while keep_trying > 0:
179
            try:
180
                return conn.getresponse()
181
            except ResponseNotReady:
182
                wait = 0.03 * random()
183
                sleep(wait)
184
                keep_trying -= wait
185
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
186
        recvlog.debug(logmsg)
187
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
188

    
189

    
190
class ResponseManager(Logged):
191
    """Manage the http request and handle the response data, headers, etc."""
192

    
193
    def __init__(self, request, poolsize=None):
194
        """
195
        :param request: (RequestManager)
196
        """
197
        self.request = request
198
        self._request_performed = False
199
        self.poolsize = poolsize
200

    
201
    def _get_response(self):
202
        if self._request_performed:
203
            return
204

    
205
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
206
        try:
207
            with PooledHTTPConnection(
208
                    self.request.netloc, self.request.scheme,
209
                    **pool_kw) as connection:
210
                self.request.LOG_TOKEN = self.LOG_TOKEN
211
                self.request.LOG_DATA = self.LOG_DATA
212
                r = self.request.perform(connection)
213
                recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
214
                    self, r, self.request))
215
                self._request_performed = True
216
                self._status_code, self._status = r.status, r.reason
217
                recvlog.info(
218
                    '%d %s\t[p: %s]' % (self.status_code, self.status, self))
219
                self._headers = dict()
220
                for k, v in r.getheaders():
221
                    if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
222
                        continue
223
                    self._headers[k] = v
224
                    recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
225
                self._content = r.read()
226
                recvlog.info('data size: %s\t[p: %s]' % (
227
                    len(self._content) if self._content else 0,
228
                    self))
229
                if self.LOG_DATA and self._content:
230
                    recvlog.info('%s\t[p: %s]' % (self._content, self))
231
        except Exception as err:
232
            from traceback import format_stack
233
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
234
            raise ClientError(
235
                'Failed while http-connecting to %s (%s)' % (
236
                    self.request.url,
237
                    err))
238

    
239
    @property
240
    def status_code(self):
241
        self._get_response()
242
        return self._status_code
243

    
244
    @property
245
    def status(self):
246
        self._get_response()
247
        return self._status
248

    
249
    @property
250
    def headers(self):
251
        self._get_response()
252
        return self._headers
253

    
254
    @property
255
    def content(self):
256
        self._get_response()
257
        return self._content
258

    
259
    @property
260
    def text(self):
261
        """
262
        :returns: (str) content
263
        """
264
        self._get_response()
265
        return '%s' % self._content
266

    
267
    @property
268
    def json(self):
269
        """
270
        :returns: (dict) squeezed from json-formated content
271
        """
272
        self._get_response()
273
        try:
274
            return loads(self._content)
275
        except ValueError as err:
276
            raise ClientError('Response not formated in JSON - %s' % err)
277

    
278

    
279
class SilentEvent(Thread):
280
    """Thread-run method(*args, **kwargs)"""
281
    def __init__(self, method, *args, **kwargs):
282
        super(self.__class__, self).__init__()
283
        self.method = method
284
        self.args = args
285
        self.kwargs = kwargs
286

    
287
    @property
288
    def exception(self):
289
        return getattr(self, '_exception', False)
290

    
291
    @property
292
    def value(self):
293
        return getattr(self, '_value', None)
294

    
295
    def run(self):
296
        try:
297
            self._value = self.method(*(self.args), **(self.kwargs))
298
        except Exception as e:
299
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
300
                self,
301
                type(e),
302
                e.status if isinstance(e, ClientError) else '',
303
                e))
304
            self._exception = e
305

    
306

    
307
class Client(object):
308

    
309
    MAX_THREADS = 7
310
    DATE_FORMATS = [
311
        '%a %b %d %H:%M:%S %Y',
312
        '%A, %d-%b-%y %H:%M:%S GMT',
313
        '%a, %d %b %Y %H:%M:%S GMT']
314
    LOG_TOKEN = False
315
    LOG_DATA = False
316

    
317
    def __init__(self, base_url, token):
318
        self.base_url = base_url
319
        self.token = token
320
        self.headers, self.params = dict(), dict()
321

    
322
    def _init_thread_limit(self, limit=1):
323
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
324
        self._thread_limit = limit
325
        self._elapsed_old = 0.0
326
        self._elapsed_new = 0.0
327

    
328
    def _watch_thread_limit(self, threadlist):
329
        self._thread_limit = getattr(self, '_thread_limit', 1)
330
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
331
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
332
        recvlog.debug('# running threads: %s' % len(threadlist))
333
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
334
                self._thread_limit < self.MAX_THREADS):
335
            self._thread_limit += 1
336
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
337
            self._thread_limit -= 1
338

    
339
        self._elapsed_old = self._elapsed_new
340
        if len(threadlist) >= self._thread_limit:
341
            self._elapsed_new = 0.0
342
            for thread in threadlist:
343
                begin_time = time()
344
                thread.join()
345
                self._elapsed_new += time() - begin_time
346
            self._elapsed_new = self._elapsed_new / len(threadlist)
347
            return []
348
        return threadlist
349

    
350
    def _raise_for_status(self, r):
351
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
352
        status_msg = getattr(r, 'status', None) or ''
353
        try:
354
            message = '%s %s\n' % (status_msg, r.text)
355
        except:
356
            message = '%s %s\n' % (status_msg, r)
357
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
358
        raise ClientError(message, status=status)
359

    
360
    def set_header(self, name, value, iff=True):
361
        """Set a header 'name':'value'"""
362
        if value is not None and iff:
363
            self.headers[name] = value
364

    
365
    def set_param(self, name, value=None, iff=True):
366
        if iff:
367
            self.params[name] = value
368

    
369
    def request(
370
            self, method, path,
371
            async_headers=dict(), async_params=dict(),
372
            **kwargs):
373
        """Commit an HTTP request to base_url/path
374
        Requests are commited to and performed by Request/ResponseManager
375
        These classes perform a lazy http request. Present method, by default,
376
        enforces them to perform the http call. Hint: call present method with
377
        success=None to get a non-performed ResponseManager object.
378
        """
379
        assert isinstance(method, str) or isinstance(method, unicode)
380
        assert method
381
        assert isinstance(path, str) or isinstance(path, unicode)
382
        try:
383
            headers = dict(self.headers)
384
            headers.update(async_headers)
385
            params = dict(self.params)
386
            params.update(async_params)
387
            success = kwargs.pop('success', 200)
388
            data = kwargs.pop('data', None)
389
            headers.setdefault('X-Auth-Token', self.token)
390
            if 'json' in kwargs:
391
                data = dumps(kwargs.pop('json'))
392
                headers.setdefault('Content-Type', 'application/json')
393
            if data:
394
                headers.setdefault('Content-Length', '%s' % len(data))
395

    
396
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
397
            req = RequestManager(
398
                method, self.base_url, path,
399
                data=data, headers=headers, params=params)
400
            #  req.log()
401
            r = ResponseManager(req)
402
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
403
        finally:
404
            self.headers = dict()
405
            self.params = dict()
406

    
407
        if success is not None:
408
            # Success can either be an int or a collection
409
            success = (success,) if isinstance(success, int) else success
410
            if r.status_code not in success:
411
                self._raise_for_status(r)
412
        return r
413

    
414
    def delete(self, path, **kwargs):
415
        return self.request('delete', path, **kwargs)
416

    
417
    def get(self, path, **kwargs):
418
        return self.request('get', path, **kwargs)
419

    
420
    def head(self, path, **kwargs):
421
        return self.request('head', path, **kwargs)
422

    
423
    def post(self, path, **kwargs):
424
        return self.request('post', path, **kwargs)
425

    
426
    def put(self, path, **kwargs):
427
        return self.request('put', path, **kwargs)
428

    
429
    def copy(self, path, **kwargs):
430
        return self.request('copy', path, **kwargs)
431

    
432
    def move(self, path, **kwargs):
433
        return self.request('move', path, **kwargs)