Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 8c54338a

History | View | Annotate | Download (14.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103

    
104

    
105
class RequestManager(Logged):
106
    """Handle http request information"""
107

    
108
    def _connection_info(self, url, path, params={}):
109
        """ Set self.url to scheme://netloc/?params
110
        :param url: (str or unicode) The service url
111

112
        :param path: (str or unicode) The service path (url/path)
113

114
        :param params: (dict) Parameters to add to final url
115

116
        :returns: (scheme, netloc)
117
        """
118
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
119
        url += '' if url.endswith('/') else '/'
120
        if path:
121
            url += _encode(path[1:] if path.startswith('/') else path)
122
        delim = '?'
123
        for key, val in params.items():
124
            val = _encode(val)
125
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
126
            delim = '&'
127
        parsed = urlparse(url)
128
        self.url = url
129
        self.path = parsed.path or '/'
130
        if parsed.query:
131
            self.path += '?%s' % parsed.query
132
        return (parsed.scheme, parsed.netloc)
133

    
134
    def __init__(
135
            self, method, url, path,
136
            data=None, headers={}, params={}):
137
        method = method.upper()
138
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
139
        if headers:
140
            assert isinstance(headers, dict)
141
        self.headers = dict(headers)
142
        self.method, self.data = method, data
143
        self.scheme, self.netloc = self._connection_info(url, path, params)
144

    
145
    def dump_log(self):
146
        sendlog.info('%s %s://%s%s\t[%s]' % (
147
            self.method,
148
            self.scheme,
149
            self.netloc,
150
            self.path,
151
            self))
152
        for key, val in self.headers.items():
153
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
154
                continue
155
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
156
        if self.data:
157
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data)
160
        else:
161
            sendlog.info('data size:0\t[%s]' % self)
162
        sendlog.info('')
163

    
164
    def perform(self, conn):
165
        """
166
        :param conn: (httplib connection object)
167

168
        :returns: (HTTPResponse)
169
        """
170
        conn.request(
171
            method=str(self.method.upper()),
172
            url=str(self.path),
173
            headers=self.headers,
174
            body=self.data)
175
        self.dump_log()
176
        keep_trying = TIMEOUT
177
        while keep_trying > 0:
178
            try:
179
                return conn.getresponse()
180
            except ResponseNotReady:
181
                wait = 0.03 * random()
182
                sleep(wait)
183
                keep_trying -= wait
184
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
185
        recvlog.debug(logmsg)
186
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
187

    
188

    
189
class ResponseManager(Logged):
190
    """Manage the http request and handle the response data, headers, etc."""
191

    
192
    def __init__(self, request, poolsize=None):
193
        """
194
        :param request: (RequestManager)
195
        """
196
        self.request = request
197
        self._request_performed = False
198
        self.poolsize = poolsize
199

    
200
    def _get_response(self):
201
        if self._request_performed:
202
            return
203

    
204
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
205
        try:
206
            with PooledHTTPConnection(
207
                    self.request.netloc, self.request.scheme,
208
                    **pool_kw) as connection:
209
                self.request.LOG_TOKEN = self.LOG_TOKEN
210
                self.request.LOG_DATA = self.LOG_DATA
211
                r = self.request.perform(connection)
212
                recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
213
                    self, r, self.request))
214
                self._request_performed = True
215
                self._status_code, self._status = r.status, unquote(r.reason)
216
                recvlog.info(
217
                    '%d %s\t[p: %s]' % (self.status_code, self.status, self))
218
                self._headers = dict()
219
                for k, v in r.getheaders():
220
                    if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
221
                        continue
222
                    v = unquote(v)
223
                    self._headers[k] = v
224
                    recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
225
                self._content = r.read()
226
                recvlog.info('data size: %s\t[p: %s]' % (
227
                    len(self._content) if self._content else 0,
228
                    self))
229
                if self.LOG_DATA and self._content:
230
                    recvlog.info('%s\t[p: %s]' % (self._content, self))
231
        except Exception as err:
232
            from traceback import format_stack
233
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
234
            raise ClientError(
235
                'Failed while http-connecting to %s (%s)' % (
236
                    self.request.url,
237
                    err))
238

    
239
    @property
240
    def status_code(self):
241
        self._get_response()
242
        return self._status_code
243

    
244
    @property
245
    def status(self):
246
        self._get_response()
247
        return self._status
248

    
249
    @property
250
    def headers(self):
251
        self._get_response()
252
        return self._headers
253

    
254
    @property
255
    def content(self):
256
        self._get_response()
257
        return self._content
258

    
259
    @property
260
    def text(self):
261
        """
262
        :returns: (str) content
263
        """
264
        self._get_response()
265
        return '%s' % self._content
266

    
267
    @property
268
    def json(self):
269
        """
270
        :returns: (dict) squeezed from json-formated content
271
        """
272
        self._get_response()
273
        try:
274
            return loads(self._content)
275
        except ValueError as err:
276
            raise ClientError('Response not formated in JSON - %s' % err)
277

    
278

    
279
class SilentEvent(Thread):
280
    """Thread-run method(*args, **kwargs)"""
281
    def __init__(self, method, *args, **kwargs):
282
        super(self.__class__, self).__init__()
283
        self.method = method
284
        self.args = args
285
        self.kwargs = kwargs
286

    
287
    @property
288
    def exception(self):
289
        return getattr(self, '_exception', False)
290

    
291
    @property
292
    def value(self):
293
        return getattr(self, '_value', None)
294

    
295
    def run(self):
296
        try:
297
            self._value = self.method(*(self.args), **(self.kwargs))
298
        except Exception as e:
299
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
300
                self,
301
                type(e),
302
                e.status if isinstance(e, ClientError) else '',
303
                e))
304
            self._exception = e
305

    
306

    
307
class Client(object):
308

    
309
    MAX_THREADS = 7
310
    DATE_FORMATS = [
311
        '%a %b %d %H:%M:%S %Y',
312
        '%A, %d-%b-%y %H:%M:%S GMT',
313
        '%a, %d %b %Y %H:%M:%S GMT']
314
    LOG_TOKEN = False
315
    LOG_DATA = False
316

    
317
    def __init__(self, base_url, token):
318
        assert base_url, 'No base_url for client %s' % self
319
        self.base_url = base_url
320
        self.token = token
321
        self.headers, self.params = dict(), dict()
322

    
323
    def _init_thread_limit(self, limit=1):
324
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
325
        self._thread_limit = limit
326
        self._elapsed_old = 0.0
327
        self._elapsed_new = 0.0
328

    
329
    def _watch_thread_limit(self, threadlist):
330
        self._thread_limit = getattr(self, '_thread_limit', 1)
331
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
332
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
333
        recvlog.debug('# running threads: %s' % len(threadlist))
334
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
335
                self._thread_limit < self.MAX_THREADS):
336
            self._thread_limit += 1
337
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
338
            self._thread_limit -= 1
339

    
340
        self._elapsed_old = self._elapsed_new
341
        if len(threadlist) >= self._thread_limit:
342
            self._elapsed_new = 0.0
343
            for thread in threadlist:
344
                begin_time = time()
345
                thread.join()
346
                self._elapsed_new += time() - begin_time
347
            self._elapsed_new = self._elapsed_new / len(threadlist)
348
            return []
349
        return threadlist
350

    
351
    def _raise_for_status(self, r):
352
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
353
        status_msg = getattr(r, 'status', None) or ''
354
        try:
355
            message = '%s %s\n' % (status_msg, r.text)
356
        except:
357
            message = '%s %s\n' % (status_msg, r)
358
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
359
        raise ClientError(message, status=status)
360

    
361
    def set_header(self, name, value, iff=True):
362
        """Set a header 'name':'value'"""
363
        if value is not None and iff:
364
            self.headers[name] = value
365

    
366
    def set_param(self, name, value=None, iff=True):
367
        if iff:
368
            self.params[name] = value
369

    
370
    def request(
371
            self, method, path,
372
            async_headers=dict(), async_params=dict(),
373
            **kwargs):
374
        """Commit an HTTP request to base_url/path
375
        Requests are commited to and performed by Request/ResponseManager
376
        These classes perform a lazy http request. Present method, by default,
377
        enforces them to perform the http call. Hint: call present method with
378
        success=None to get a non-performed ResponseManager object.
379
        """
380
        assert isinstance(method, str) or isinstance(method, unicode)
381
        assert method
382
        assert isinstance(path, str) or isinstance(path, unicode)
383
        try:
384
            headers = dict(self.headers)
385
            headers.update(async_headers)
386
            params = dict(self.params)
387
            params.update(async_params)
388
            success = kwargs.pop('success', 200)
389
            data = kwargs.pop('data', None)
390
            headers.setdefault('X-Auth-Token', self.token)
391
            if 'json' in kwargs:
392
                data = dumps(kwargs.pop('json'))
393
                headers.setdefault('Content-Type', 'application/json')
394
            if data:
395
                headers.setdefault('Content-Length', '%s' % len(data))
396

    
397
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
398
            req = RequestManager(
399
                method, self.base_url, path,
400
                data=data, headers=headers, params=params)
401
            #  req.log()
402
            r = ResponseManager(req)
403
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
404
        finally:
405
            self.headers = dict()
406
            self.params = dict()
407

    
408
        if success is not None:
409
            # Success can either be an int or a collection
410
            success = (success,) if isinstance(success, int) else success
411
            if r.status_code not in success:
412
                self._raise_for_status(r)
413
        return r
414

    
415
    def delete(self, path, **kwargs):
416
        return self.request('delete', path, **kwargs)
417

    
418
    def get(self, path, **kwargs):
419
        return self.request('get', path, **kwargs)
420

    
421
    def head(self, path, **kwargs):
422
        return self.request('head', path, **kwargs)
423

    
424
    def post(self, path, **kwargs):
425
        return self.request('post', path, **kwargs)
426

    
427
    def put(self, path, **kwargs):
428
        return self.request('put', path, **kwargs)
429

    
430
    def copy(self, path, **kwargs):
431
        return self.request('copy', path, **kwargs)
432

    
433
    def move(self, path, **kwargs):
434
        return self.request('move', path, **kwargs)