Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 201baa17

History | View | Annotate | Download (14.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103

    
104

    
105
class RequestManager(Logged):
106
    """Handle http request information"""
107

    
108
    def _connection_info(self, url, path, params={}):
109
        """ Set self.url to scheme://netloc/?params
110
        :param url: (str or unicode) The service url
111

112
        :param path: (str or unicode) The service path (url/path)
113

114
        :param params: (dict) Parameters to add to final url
115

116
        :returns: (scheme, netloc)
117
        """
118
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
119
        url += '' if url.endswith('/') else '/'
120
        if path:
121
            url += _encode(path[1:] if path.startswith('/') else path)
122
        delim = '?'
123
        for key, val in params.items():
124
            val = _encode(val)
125
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
126
            delim = '&'
127
        parsed = urlparse(url)
128
        self.url = url
129
        self.path = parsed.path or '/'
130
        if parsed.query:
131
            self.path += '?%s' % parsed.query
132
        return (parsed.scheme, parsed.netloc)
133

    
134
    def __init__(
135
            self, method, url, path,
136
            data=None, headers={}, params={}):
137
        method = method.upper()
138
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
139
        if headers:
140
            assert isinstance(headers, dict)
141
        self.headers = dict(headers)
142
        self.method, self.data = method, data
143
        self.scheme, self.netloc = self._connection_info(url, path, params)
144

    
145
    def dump_log(self):
146
        sendlog.info('%s %s://%s%s\t[%s]' % (
147
            self.method, self.scheme, self.netloc, self.path, self))
148
        for key, val in self.headers.items():
149
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
150
                continue
151
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
152
        if self.data:
153
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
154
            if self.LOG_DATA:
155
                sendlog.info(self.data)
156
        else:
157
            sendlog.info('data size:0\t[%s]' % self)
158
        sendlog.info('')
159

    
160
    def perform(self, conn):
161
        """
162
        :param conn: (httplib connection object)
163

164
        :returns: (HTTPResponse)
165
        """
166
        conn.request(
167
            method=str(self.method.upper()),
168
            url=str(self.path),
169
            headers=self.headers,
170
            body=self.data)
171
        self.dump_log()
172
        keep_trying = TIMEOUT
173
        while keep_trying > 0:
174
            try:
175
                return conn.getresponse()
176
            except ResponseNotReady:
177
                wait = 0.03 * random()
178
                sleep(wait)
179
                keep_trying -= wait
180
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
181
        recvlog.debug(logmsg)
182
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
183

    
184

    
185
class ResponseManager(Logged):
186
    """Manage the http request and handle the response data, headers, etc."""
187

    
188
    def __init__(self, request, poolsize=None):
189
        """
190
        :param request: (RequestManager)
191
        """
192
        self.request = request
193
        self._request_performed = False
194
        self.poolsize = poolsize
195

    
196
    def _get_response(self):
197
        if self._request_performed:
198
            return
199

    
200
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
201
        try:
202
            with PooledHTTPConnection(
203
                    self.request.netloc, self.request.scheme,
204
                    **pool_kw) as connection:
205
                self.request.LOG_TOKEN = self.LOG_TOKEN
206
                self.request.LOG_DATA = self.LOG_DATA
207
                r = self.request.perform(connection)
208
                recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
209
                    self, r, self.request))
210
                self._request_performed = True
211
                self._status_code, self._status = r.status, unquote(r.reason)
212
                recvlog.info(
213
                    '%d %s\t[p: %s]' % (self.status_code, self.status, self))
214
                self._headers = dict()
215
                for k, v in r.getheaders():
216
                    if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
217
                        continue
218
                    v = unquote(v)
219
                    self._headers[k] = v
220
                    recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
221
                self._content = r.read()
222
                recvlog.info('data size: %s\t[p: %s]' % (
223
                    len(self._content) if self._content else 0,
224
                    self))
225
                if self.LOG_DATA and self._content:
226
                    recvlog.info('%s\t[p: %s]' % (self._content, self))
227
        except Exception as err:
228
            from traceback import format_stack
229
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
230
            raise ClientError(
231
                'Failed while http-connecting to %s (%s)' % (
232
                    self.request.url,
233
                    err))
234

    
235
    @property
236
    def status_code(self):
237
        self._get_response()
238
        return self._status_code
239

    
240
    @property
241
    def status(self):
242
        self._get_response()
243
        return self._status
244

    
245
    @property
246
    def headers(self):
247
        self._get_response()
248
        return self._headers
249

    
250
    @property
251
    def content(self):
252
        self._get_response()
253
        return self._content
254

    
255
    @property
256
    def text(self):
257
        """
258
        :returns: (str) content
259
        """
260
        self._get_response()
261
        return '%s' % self._content
262

    
263
    @property
264
    def json(self):
265
        """
266
        :returns: (dict) squeezed from json-formated content
267
        """
268
        self._get_response()
269
        try:
270
            return loads(self._content)
271
        except ValueError as err:
272
            raise ClientError('Response not formated in JSON - %s' % err)
273

    
274

    
275
class SilentEvent(Thread):
276
    """Thread-run method(*args, **kwargs)"""
277
    def __init__(self, method, *args, **kwargs):
278
        super(self.__class__, self).__init__()
279
        self.method = method
280
        self.args = args
281
        self.kwargs = kwargs
282

    
283
    @property
284
    def exception(self):
285
        return getattr(self, '_exception', False)
286

    
287
    @property
288
    def value(self):
289
        return getattr(self, '_value', None)
290

    
291
    def run(self):
292
        try:
293
            self._value = self.method(*(self.args), **(self.kwargs))
294
        except Exception as e:
295
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
296
                self,
297
                type(e),
298
                e.status if isinstance(e, ClientError) else '',
299
                e))
300
            self._exception = e
301

    
302

    
303
class Client(object):
304

    
305
    MAX_THREADS = 7
306
    DATE_FORMATS = [
307
        '%a %b %d %H:%M:%S %Y',
308
        '%A, %d-%b-%y %H:%M:%S GMT',
309
        '%a, %d %b %Y %H:%M:%S GMT']
310
    LOG_TOKEN = False
311
    LOG_DATA = False
312

    
313
    def __init__(self, base_url, token):
314
        assert base_url, 'No base_url for client %s' % self
315
        self.base_url = base_url
316
        self.token = token
317
        self.headers, self.params = dict(), dict()
318

    
319
    def _init_thread_limit(self, limit=1):
320
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
321
        self._thread_limit = limit
322
        self._elapsed_old = 0.0
323
        self._elapsed_new = 0.0
324

    
325
    def _watch_thread_limit(self, threadlist):
326
        self._thread_limit = getattr(self, '_thread_limit', 1)
327
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
328
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
329
        recvlog.debug('# running threads: %s' % len(threadlist))
330
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
331
                self._thread_limit < self.MAX_THREADS):
332
            self._thread_limit += 1
333
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
334
            self._thread_limit -= 1
335

    
336
        self._elapsed_old = self._elapsed_new
337
        if len(threadlist) >= self._thread_limit:
338
            self._elapsed_new = 0.0
339
            for thread in threadlist:
340
                begin_time = time()
341
                thread.join()
342
                self._elapsed_new += time() - begin_time
343
            self._elapsed_new = self._elapsed_new / len(threadlist)
344
            return []
345
        return threadlist
346

    
347
    def _raise_for_status(self, r):
348
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
349
        status_msg = getattr(r, 'status', None) or ''
350
        try:
351
            message = '%s %s\n' % (status_msg, r.text)
352
        except:
353
            message = '%s %s\n' % (status_msg, r)
354
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
355
        raise ClientError(message, status=status)
356

    
357
    def set_header(self, name, value, iff=True):
358
        """Set a header 'name':'value'"""
359
        if value is not None and iff:
360
            self.headers[name] = value
361

    
362
    def set_param(self, name, value=None, iff=True):
363
        if iff:
364
            self.params[name] = value
365

    
366
    def request(
367
            self, method, path,
368
            async_headers=dict(), async_params=dict(),
369
            **kwargs):
370
        """Commit an HTTP request to base_url/path
371
        Requests are commited to and performed by Request/ResponseManager
372
        These classes perform a lazy http request. Present method, by default,
373
        enforces them to perform the http call. Hint: call present method with
374
        success=None to get a non-performed ResponseManager object.
375
        """
376
        assert isinstance(method, str) or isinstance(method, unicode)
377
        assert method
378
        assert isinstance(path, str) or isinstance(path, unicode)
379
        try:
380
            headers = dict(self.headers)
381
            headers.update(async_headers)
382
            params = dict(self.params)
383
            params.update(async_params)
384
            success = kwargs.pop('success', 200)
385
            data = kwargs.pop('data', None)
386
            headers.setdefault('X-Auth-Token', self.token)
387
            if 'json' in kwargs:
388
                data = dumps(kwargs.pop('json'))
389
                headers.setdefault('Content-Type', 'application/json')
390
            if data:
391
                headers.setdefault('Content-Length', '%s' % len(data))
392

    
393
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
394
            req = RequestManager(
395
                method, self.base_url, path,
396
                data=data, headers=headers, params=params)
397
            #  req.log()
398
            r = ResponseManager(req)
399
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
400
        finally:
401
            self.headers = dict()
402
            self.params = dict()
403

    
404
        if success is not None:
405
            # Success can either be an int or a collection
406
            success = (success,) if isinstance(success, int) else success
407
            if r.status_code not in success:
408
                self._raise_for_status(r)
409
        return r
410

    
411
    def delete(self, path, **kwargs):
412
        return self.request('delete', path, **kwargs)
413

    
414
    def get(self, path, **kwargs):
415
        return self.request('get', path, **kwargs)
416

    
417
    def head(self, path, **kwargs):
418
        return self.request('head', path, **kwargs)
419

    
420
    def post(self, path, **kwargs):
421
        return self.request('post', path, **kwargs)
422

    
423
    def put(self, path, **kwargs):
424
        return self.request('put', path, **kwargs)
425

    
426
    def copy(self, path, **kwargs):
427
        return self.request('copy', path, **kwargs)
428

    
429
    def move(self, path, **kwargs):
430
        return self.request('move', path, **kwargs)