Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 545c6c29

History | View | Annotate | Download (14.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103

    
104

    
105
class RequestManager(Logged):
106
    """Handle http request information"""
107

    
108
    def _connection_info(self, url, path, params={}):
109
        """ Set self.url to scheme://netloc/?params
110
        :param url: (str or unicode) The service url
111

112
        :param path: (str or unicode) The service path (url/path)
113

114
        :param params: (dict) Parameters to add to final url
115

116
        :returns: (scheme, netloc)
117
        """
118
        url = _encode(url) if url else 'http://127.0.0.1/'
119
        url += '' if url.endswith('/') else '/'
120
        if path:
121
            url += _encode(path[1:] if path.startswith('/') else path)
122
        delim = '?'
123
        for key, val in params.items():
124
            val = _encode(val)
125
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
126
            delim = '&'
127
        parsed = urlparse(url)
128
        self.url = url
129
        self.path = parsed.path or '/'
130
        if parsed.query:
131
            self.path += '?%s' % parsed.query
132
        return (parsed.scheme, parsed.netloc)
133

    
134
    def __init__(
135
            self, method, url, path,
136
            data=None, headers={}, params={}):
137
        method = method.upper()
138
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
139
        if headers:
140
            assert isinstance(headers, dict)
141
        self.headers = dict(headers)
142
        self.method, self.data = method, data
143
        self.scheme, self.netloc = self._connection_info(url, path, params)
144

    
145
    def dump_log(self):
146
        sendlog.info('%s %s://%s%s\t[%s]' % (
147
            self.method,
148
            self.scheme,
149
            self.netloc,
150
            self.path,
151
            self))
152
        for key, val in self.headers.items():
153
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
154
                continue
155
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
156
        if self.data:
157
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data)
160
        else:
161
            sendlog.info('data size:0\t[%s]' % self)
162
        sendlog.info('')
163

    
164
    def perform(self, conn):
165
        """
166
        :param conn: (httplib connection object)
167

168
        :returns: (HTTPResponse)
169
        """
170
        conn.request(
171
            method=str(self.method.upper()),
172
            url=str(self.path),
173
            headers=self.headers,
174
            body=self.data)
175
        self.dump_log()
176
        keep_trying = TIMEOUT
177
        while keep_trying > 0:
178
            try:
179
                return conn.getresponse()
180
            except ResponseNotReady:
181
                wait = 0.03 * random()
182
                sleep(wait)
183
                keep_trying -= wait
184
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
185
        recvlog.debug(logmsg)
186
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
187

    
188

    
189
class ResponseManager(Logged):
190
    """Manage the http request and handle the response data, headers, etc."""
191

    
192
    def __init__(self, request, poolsize=None):
193
        """
194
        :param request: (RequestManager)
195
        """
196
        self.request = request
197
        self._request_performed = False
198
        self.poolsize = poolsize
199

    
200
    def _get_response(self):
201
        if self._request_performed:
202
            return
203

    
204
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
205
        try:
206
            with PooledHTTPConnection(
207
                    self.request.netloc, self.request.scheme,
208
                    **pool_kw) as connection:
209
                self.request.LOG_TOKEN = self.LOG_TOKEN
210
                self.request.LOG_DATA = self.LOG_DATA
211
                r = self.request.perform(connection)
212
                recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
213
                    self, r, self.request))
214
                self._request_performed = True
215
                self._status_code, self._status = r.status, unquote(r.reason)
216
                recvlog.info(
217
                    '%d %s\t[p: %s]' % (self.status_code, self.status, self))
218
                self._headers = dict()
219
                for k, v in r.getheaders():
220
                    if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
221
                        continue
222
                    v = unquote(v)
223
                    self._headers[k] = v
224
                    recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
225
                self._content = r.read()
226
                recvlog.info('data size: %s\t[p: %s]' % (
227
                    len(self._content) if self._content else 0,
228
                    self))
229
                if self.LOG_DATA and self._content:
230
                    recvlog.info('%s\t[p: %s]' % (self._content, self))
231
        except Exception as err:
232
            from traceback import format_stack
233
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
234
            raise ClientError(
235
                'Failed while http-connecting to %s (%s)' % (
236
                    self.request.url,
237
                    err))
238

    
239
    @property
240
    def status_code(self):
241
        self._get_response()
242
        return self._status_code
243

    
244
    @property
245
    def status(self):
246
        self._get_response()
247
        return self._status
248

    
249
    @property
250
    def headers(self):
251
        self._get_response()
252
        return self._headers
253

    
254
    @property
255
    def content(self):
256
        self._get_response()
257
        return self._content
258

    
259
    @property
260
    def text(self):
261
        """
262
        :returns: (str) content
263
        """
264
        self._get_response()
265
        return '%s' % self._content
266

    
267
    @property
268
    def json(self):
269
        """
270
        :returns: (dict) squeezed from json-formated content
271
        """
272
        self._get_response()
273
        try:
274
            return loads(self._content)
275
        except ValueError as err:
276
            raise ClientError('Response not formated in JSON - %s' % err)
277

    
278

    
279
class SilentEvent(Thread):
280
    """Thread-run method(*args, **kwargs)"""
281
    def __init__(self, method, *args, **kwargs):
282
        super(self.__class__, self).__init__()
283
        self.method = method
284
        self.args = args
285
        self.kwargs = kwargs
286

    
287
    @property
288
    def exception(self):
289
        return getattr(self, '_exception', False)
290

    
291
    @property
292
    def value(self):
293
        return getattr(self, '_value', None)
294

    
295
    def run(self):
296
        try:
297
            self._value = self.method(*(self.args), **(self.kwargs))
298
        except Exception as e:
299
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
300
                self,
301
                type(e),
302
                e.status if isinstance(e, ClientError) else '',
303
                e))
304
            self._exception = e
305

    
306

    
307
class Client(object):
308

    
309
    MAX_THREADS = 7
310
    DATE_FORMATS = [
311
        '%a %b %d %H:%M:%S %Y',
312
        '%A, %d-%b-%y %H:%M:%S GMT',
313
        '%a, %d %b %Y %H:%M:%S GMT']
314
    LOG_TOKEN = False
315
    LOG_DATA = False
316

    
317
    def __init__(self, base_url, token):
318
        self.base_url = base_url
319
        self.token = token
320
        self.headers, self.params = dict(), dict()
321

    
322
    def _init_thread_limit(self, limit=1):
323
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
324
        self._thread_limit = limit
325
        self._elapsed_old = 0.0
326
        self._elapsed_new = 0.0
327

    
328
    def _watch_thread_limit(self, threadlist):
329
        self._thread_limit = getattr(self, '_thread_limit', 1)
330
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
331
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
332
        recvlog.debug('# running threads: %s' % len(threadlist))
333
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
334
                self._thread_limit < self.MAX_THREADS):
335
            self._thread_limit += 1
336
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
337
            self._thread_limit -= 1
338

    
339
        self._elapsed_old = self._elapsed_new
340
        if len(threadlist) >= self._thread_limit:
341
            self._elapsed_new = 0.0
342
            for thread in threadlist:
343
                begin_time = time()
344
                thread.join()
345
                self._elapsed_new += time() - begin_time
346
            self._elapsed_new = self._elapsed_new / len(threadlist)
347
            return []
348
        return threadlist
349

    
350
    def _raise_for_status(self, r):
351
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
352
        status_msg = getattr(r, 'status', None) or ''
353
        try:
354
            message = '%s %s\n' % (status_msg, r.text)
355
        except:
356
            message = '%s %s\n' % (status_msg, r)
357
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
358
        raise ClientError(message, status=status)
359

    
360
    def set_header(self, name, value, iff=True):
361
        """Set a header 'name':'value'"""
362
        if value is not None and iff:
363
            self.headers[name] = value
364

    
365
    def set_param(self, name, value=None, iff=True):
366
        if iff:
367
            self.params[name] = value
368

    
369
    def request(
370
            self, method, path,
371
            async_headers=dict(), async_params=dict(),
372
            **kwargs):
373
        """Commit an HTTP request to base_url/path
374
        Requests are commited to and performed by Request/ResponseManager
375
        These classes perform a lazy http request. Present method, by default,
376
        enforces them to perform the http call. Hint: call present method with
377
        success=None to get a non-performed ResponseManager object.
378
        """
379
        assert isinstance(method, str) or isinstance(method, unicode)
380
        assert method
381
        assert isinstance(path, str) or isinstance(path, unicode)
382
        try:
383
            headers = dict(self.headers)
384
            headers.update(async_headers)
385
            params = dict(self.params)
386
            params.update(async_params)
387
            success = kwargs.pop('success', 200)
388
            data = kwargs.pop('data', None)
389
            headers.setdefault('X-Auth-Token', self.token)
390
            if 'json' in kwargs:
391
                data = dumps(kwargs.pop('json'))
392
                headers.setdefault('Content-Type', 'application/json')
393
            if data:
394
                headers.setdefault('Content-Length', '%s' % len(data))
395

    
396
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
397
            req = RequestManager(
398
                method, self.base_url, path,
399
                data=data, headers=headers, params=params)
400
            #  req.log()
401
            r = ResponseManager(req)
402
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
403
        finally:
404
            self.headers = dict()
405
            self.params = dict()
406

    
407
        if success is not None:
408
            # Success can either be an int or a collection
409
            success = (success,) if isinstance(success, int) else success
410
            if r.status_code not in success:
411
                self._raise_for_status(r)
412
        return r
413

    
414
    def delete(self, path, **kwargs):
415
        return self.request('delete', path, **kwargs)
416

    
417
    def get(self, path, **kwargs):
418
        return self.request('get', path, **kwargs)
419

    
420
    def head(self, path, **kwargs):
421
        return self.request('head', path, **kwargs)
422

    
423
    def post(self, path, **kwargs):
424
        return self.request('post', path, **kwargs)
425

    
426
    def put(self, path, **kwargs):
427
        return self.request('put', path, **kwargs)
428

    
429
    def copy(self, path, **kwargs):
430
        return self.request('copy', path, **kwargs)
431

    
432
    def move(self, path, **kwargs):
433
        return self.request('move', path, **kwargs)