Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 4018326d

History | View | Annotate | Download (15.2 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady
40
from time import sleep
41
from random import random
42

    
43
from objpool.http import PooledHTTPConnection
44

    
45
from kamaki import logger
46

    
47
LOG_FILE = logger.get_log_filename()
48
TIMEOUT = 60.0   # seconds
49
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
50

    
51
logger.add_file_logger('clients.send', __name__, filename=LOG_FILE)
52
sendlog = logger.get_logger('clients.send')
53
sendlog.debug('Logging location: %s' % LOG_FILE)
54

    
55
logger.add_file_logger('data.send', __name__, filename=LOG_FILE)
56
datasendlog = logger.get_logger('data.send')
57

    
58
logger.add_file_logger('clients.recv', __name__, filename=LOG_FILE)
59
recvlog = logger.get_logger('clients.recv')
60

    
61
logger.add_file_logger('data.recv', __name__, filename=LOG_FILE)
62
datarecvlog = logger.get_logger('data.recv')
63

    
64
logger.add_file_logger('ClientError', __name__, filename=LOG_FILE)
65
clienterrorlog = logger.get_logger('ClientError')
66

    
67

    
68
def _encode(v):
69
    if v and isinstance(v, unicode):
70
        return quote(v.encode('utf-8'))
71
    return v
72

    
73

    
74
class ClientError(Exception):
75
    def __init__(self, message, status=0, details=None):
76
        clienterrorlog.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
77
            message,
78
            status,
79
            details))
80
        try:
81
            message += '' if message and message[-1] == '\n' else '\n'
82
            serv_stat, sep, new_msg = message.partition('{')
83
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
84
            json_msg = loads(new_msg)
85
            key = json_msg.keys()[0]
86
            serv_stat = serv_stat.strip()
87

    
88
            json_msg = json_msg[key]
89
            message = '%s %s (%s)\n' % (
90
                serv_stat,
91
                key,
92
                json_msg['message']) if (
93
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
94
            status = json_msg.get('code', status)
95
            if 'details' in json_msg:
96
                if not details:
97
                    details = []
98
                if not isinstance(details, list):
99
                    details = [details]
100
                if json_msg['details']:
101
                    details.append(json_msg['details'])
102
        except Exception:
103
            pass
104
        finally:
105
            while message.endswith('\n\n'):
106
                message = message[:-1]
107
            super(ClientError, self).__init__(message)
108
            self.status = status if isinstance(status, int) else 0
109
            self.details = details if details else []
110

    
111

    
112
class Logged(object):
113

    
114
    LOG_TOKEN = False
115
    LOG_DATA = False
116

    
117

    
118
class RequestManager(Logged):
119
    """Handle http request information"""
120

    
121
    def _connection_info(self, url, path, params={}):
122
        """ Set self.url to scheme://netloc/?params
123
        :param url: (str or unicode) The service url
124

125
        :param path: (str or unicode) The service path (url/path)
126

127
        :param params: (dict) Parameters to add to final url
128

129
        :returns: (scheme, netloc)
130
        """
131
        url = _encode(url) if url else 'http://127.0.0.1/'
132
        url += '' if url.endswith('/') else '/'
133
        if path:
134
            url += _encode(path[1:] if path.startswith('/') else path)
135
        delim = '?'
136
        for key, val in params.items():
137
            val = _encode(val)
138
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
139
            delim = '&'
140
        parsed = urlparse(url)
141
        self.url = url
142
        self.path = parsed.path or '/'
143
        if parsed.query:
144
            self.path += '?%s' % parsed.query
145
        return (parsed.scheme, parsed.netloc)
146

    
147
    def __init__(
148
            self, method, url, path,
149
            data=None, headers={}, params={}):
150
        method = method.upper()
151
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
152
        if headers:
153
            assert isinstance(headers, dict)
154
        self.headers = dict(headers)
155
        self.method, self.data = method, data
156
        self.scheme, self.netloc = self._connection_info(url, path, params)
157

    
158
    def log(self):
159
        sendlog.info('%s %s://%s%s\t[%s]' % (
160
            self.method,
161
            self.scheme,
162
            self.netloc,
163
            self.path,
164
            self))
165
        for key, val in self.headers.items():
166
            if (not self.LOG_TOKEN) and key.lower() == 'x-auth-token':
167
                continue
168
            sendlog.info('  %s: %s\t[%s]' % (key, val, self))
169
        if self.data:
170
            sendlog.info('data size:%s\t[%s]' % (len(self.data), self))
171
            if self.LOG_DATA:
172
                datasendlog.info(self.data)
173
        else:
174
            sendlog.info('data size:0\t[%s]' % self)
175
        sendlog.info('')
176

    
177
    def perform(self, conn):
178
        """
179
        :param conn: (httplib connection object)
180

181
        :returns: (HTTPResponse)
182
        """
183
        conn.request(
184
            method=str(self.method.upper()),
185
            url=str(self.path),
186
            headers=self.headers,
187
            body=self.data)
188
        self.log()
189
        keep_trying = TIMEOUT
190
        while keep_trying > 0:
191
            try:
192
                return conn.getresponse()
193
            except ResponseNotReady:
194
                wait = 0.03 * random()
195
                sleep(wait)
196
                keep_trying -= wait
197
        logmsg = 'Kamaki Timeout %s %s\t[%s]' % (self.method, self.path, self)
198
        recvlog.debug(logmsg)
199
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
200

    
201

    
202
class ResponseManager(Logged):
203
    """Manage the http request and handle the response data, headers, etc."""
204

    
205
    def __init__(self, request, poolsize=None):
206
        """
207
        :param request: (RequestManager)
208
        """
209
        self.request = request
210
        self._request_performed = False
211
        self.poolsize = poolsize
212

    
213
    def _get_response(self):
214
        if self._request_performed:
215
            return
216

    
217
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
218
        try:
219
            with PooledHTTPConnection(
220
                    self.request.netloc, self.request.scheme,
221
                    **pool_kw) as connection:
222
                self.request.LOG_TOKEN = self.LOG_TOKEN
223
                self.request.LOG_DATA = self.LOG_DATA
224
                r = self.request.perform(connection)
225
                recvlog.info('[resp: %s] <-- [req: %s]\n' % (r, self.request))
226
                self._request_performed = True
227
                self._status_code, self._status = r.status, r.reason
228
                recvlog.info(
229
                    '%d %s\t[p: %s]' % (self.status_code, self.status, self))
230
                self._headers = dict()
231
                for k, v in r.getheaders():
232
                    if (not self.LOG_TOKEN) and k.lower() == 'x-auth-token':
233
                        continue
234
                    self._headers[k] = v
235
                    recvlog.info('  %s: %s\t[p: %s]' % (k, v, self))
236
                self._content = r.read()
237
                recvlog.info('data size: %s\t[p: %s]' % (
238
                    len(self._content) if self._content else 0,
239
                    self))
240
                if self.LOG_DATA and self._content:
241
                    datarecvlog.info('%s\t[p: %s]' % (self._content, self))
242
        except Exception as err:
243
            from traceback import format_stack
244
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
245
            raise ClientError(
246
                'Failed while http-connecting to %s (%s)' % (
247
                    self.request.url,
248
                    err))
249

    
250
    @property
251
    def status_code(self):
252
        self._get_response()
253
        return self._status_code
254

    
255
    @property
256
    def status(self):
257
        self._get_response()
258
        return self._status
259

    
260
    @property
261
    def headers(self):
262
        self._get_response()
263
        return self._headers
264

    
265
    @property
266
    def content(self):
267
        self._get_response()
268
        return self._content
269

    
270
    @property
271
    def text(self):
272
        """
273
        :returns: (str) content
274
        """
275
        self._get_response()
276
        return '%s' % self._content
277

    
278
    @property
279
    def json(self):
280
        """
281
        :returns: (dict) squeezed from json-formated content
282
        """
283
        self._get_response()
284
        try:
285
            return loads(self._content)
286
        except ValueError as err:
287
            raise ClientError('Response not formated in JSON - %s' % err)
288

    
289

    
290
class SilentEvent(Thread):
291
    """Thread-run method(*args, **kwargs)"""
292
    def __init__(self, method, *args, **kwargs):
293
        super(self.__class__, self).__init__()
294
        self.method = method
295
        self.args = args
296
        self.kwargs = kwargs
297

    
298
    @property
299
    def exception(self):
300
        return getattr(self, '_exception', False)
301

    
302
    @property
303
    def value(self):
304
        return getattr(self, '_value', None)
305

    
306
    def run(self):
307
        try:
308
            self._value = self.method(*(self.args), **(self.kwargs))
309
        except Exception as e:
310
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
311
                self,
312
                type(e),
313
                e.status if isinstance(e, ClientError) else '',
314
                e))
315
            self._exception = e
316

    
317

    
318
class Client(object):
319

    
320
    MAX_THREADS = 7
321
    DATE_FORMATS = [
322
        '%a %b %d %H:%M:%S %Y',
323
        '%A, %d-%b-%y %H:%M:%S GMT',
324
        '%a, %d %b %Y %H:%M:%S GMT']
325
    LOG_TOKEN = False
326
    LOG_DATA = False
327

    
328
    def __init__(self, base_url, token):
329
        self.base_url = base_url
330
        self.token = token
331
        self.headers, self.params = dict(), dict()
332

    
333
    def _init_thread_limit(self, limit=1):
334
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
335
        self._thread_limit = limit
336
        self._elapsed_old = 0.0
337
        self._elapsed_new = 0.0
338

    
339
    def _watch_thread_limit(self, threadlist):
340
        self._thread_limit = getattr(self, '_thread_limit', 1)
341
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
342
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
343
        recvlog.debug('# running threads: %s' % len(threadlist))
344
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
345
                self._thread_limit < self.MAX_THREADS):
346
            self._thread_limit += 1
347
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
348
            self._thread_limit -= 1
349

    
350
        self._elapsed_old = self._elapsed_new
351
        if len(threadlist) >= self._thread_limit:
352
            self._elapsed_new = 0.0
353
            for thread in threadlist:
354
                begin_time = time()
355
                thread.join()
356
                self._elapsed_new += time() - begin_time
357
            self._elapsed_new = self._elapsed_new / len(threadlist)
358
            return []
359
        return threadlist
360

    
361
    def _raise_for_status(self, r):
362
        clienterrorlog.debug('raise err from [%s] of type[%s]' % (r, type(r)))
363
        status_msg = getattr(r, 'status', None) or ''
364
        try:
365
            message = '%s %s\n' % (status_msg, r.text)
366
        except:
367
            message = '%s %s\n' % (status_msg, r)
368
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
369
        raise ClientError(message, status=status)
370

    
371
    def set_header(self, name, value, iff=True):
372
        """Set a header 'name':'value'"""
373
        if value is not None and iff:
374
            self.headers[name] = value
375

    
376
    def set_param(self, name, value=None, iff=True):
377
        if iff:
378
            self.params[name] = value
379

    
380
    def request(
381
            self, method, path,
382
            async_headers=dict(), async_params=dict(),
383
            **kwargs):
384
        """Commit an HTTP request to base_url/path
385
        Requests are commited to and performed by Request/ResponseManager
386
        These classes perform a lazy http request. Present method, by default,
387
        enforces them to perform the http call. Hint: call present method with
388
        success=None to get a non-performed ResponseManager object.
389
        """
390
        assert isinstance(method, str) or isinstance(method, unicode)
391
        assert method
392
        assert isinstance(path, str) or isinstance(path, unicode)
393
        try:
394
            headers = dict(self.headers)
395
            headers.update(async_headers)
396
            params = dict(self.params)
397
            params.update(async_params)
398
            success = kwargs.pop('success', 200)
399
            data = kwargs.pop('data', None)
400
            headers.setdefault('X-Auth-Token', self.token)
401
            if 'json' in kwargs:
402
                data = dumps(kwargs.pop('json'))
403
                headers.setdefault('Content-Type', 'application/json')
404
            if data:
405
                headers.setdefault('Content-Length', '%s' % len(data))
406

    
407
            sendlog.debug('\n\nCMT %s@%s\t[%s]', method, self.base_url, self)
408
            req = RequestManager(
409
                method, self.base_url, path,
410
                data=data, headers=headers, params=params)
411
            #  req.log()
412
            r = ResponseManager(req)
413
            r.LOG_TOKEN, r.LOG_DATA = self.LOG_TOKEN, self.LOG_DATA
414
        finally:
415
            self.headers = dict()
416
            self.params = dict()
417

    
418
        if success is not None:
419
            # Success can either be an int or a collection
420
            success = (success,) if isinstance(success, int) else success
421
            if r.status_code not in success:
422
                self._raise_for_status(r)
423
        return r
424

    
425
    def delete(self, path, **kwargs):
426
        return self.request('delete', path, **kwargs)
427

    
428
    def get(self, path, **kwargs):
429
        return self.request('get', path, **kwargs)
430

    
431
    def head(self, path, **kwargs):
432
        return self.request('head', path, **kwargs)
433

    
434
    def post(self, path, **kwargs):
435
        return self.request('post', path, **kwargs)
436

    
437
    def put(self, path, **kwargs):
438
        return self.request('put', path, **kwargs)
439

    
440
    def copy(self, path, **kwargs):
441
        return self.request('copy', path, **kwargs)
442

    
443
    def move(self, path, **kwargs):
444
        return self.request('move', path, **kwargs)