Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 81c60832

History | View | Annotate | Download (17.9 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103
    LOG_PID = False
104
    _token = None
105

    
106

    
107
class RequestManager(Logged):
108
    """Handle http request information"""
109

    
110
    def _connection_info(self, url, path, params={}):
111
        """ Set self.url to scheme://netloc/?params
112
        :param url: (str or unicode) The service url
113

114
        :param path: (str or unicode) The service path (url/path)
115

116
        :param params: (dict) Parameters to add to final url
117

118
        :returns: (scheme, netloc)
119
        """
120
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
121
        url += '' if url.endswith('/') else '/'
122
        if path:
123
            url += _encode(path[1:] if path.startswith('/') else path)
124
        delim = '?'
125
        for key, val in params.items():
126
            val = '' if val in (None, False) else _encode(u'%s' % val)
127
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128
            delim = '&'
129
        parsed = urlparse(url)
130
        self.url = url
131
        self.path = parsed.path or '/'
132
        if parsed.query:
133
            self.path += '?%s' % parsed.query
134
        return (parsed.scheme, parsed.netloc)
135

    
136
    def __init__(
137
            self, method, url, path,
138
            data=None, headers={}, params={}):
139
        method = method.upper()
140
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
141
        if headers:
142
            assert isinstance(headers, dict)
143
        self.headers = dict(headers)
144
        self.method, self.data = method, data
145
        self.scheme, self.netloc = self._connection_info(url, path, params)
146

    
147
    def dump_log(self):
148
        plog = '\t[%s]' if self.LOG_PID else ''
149
        sendlog.info('- -  -   -     -        -             -')
150
        sendlog.info('%s %s://%s%s%s' % (
151
            self.method, self.scheme, self.netloc, self.path, plog))
152
        for key, val in self.headers.items():
153
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154
                self._token, val = val, '...'
155
            sendlog.info('  %s: %s%s' % (key, val, plog))
156
        if self.data:
157
            sendlog.info('data size:%s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data.replace(self._token, '...') if (
160
                    self._token) else self.data)
161
        else:
162
            sendlog.info('data size:0%s' % plog)
163

    
164
    def perform(self, conn):
165
        """
166
        :param conn: (httplib connection object)
167

168
        :returns: (HTTPResponse)
169
        """
170
        self.dump_log()
171
        conn.request(
172
            method=str(self.method.upper()),
173
            url=str(self.path),
174
            headers=self.headers,
175
            body=self.data)
176
        sendlog.info('')
177
        keep_trying = TIMEOUT
178
        while keep_trying > 0:
179
            try:
180
                return conn.getresponse()
181
            except ResponseNotReady:
182
                wait = 0.03 * random()
183
                sleep(wait)
184
                keep_trying -= wait
185
        plog = '\t[%s]' if self.LOG_PID else ''
186
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187
        recvlog.debug(logmsg)
188
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
189

    
190

    
191
class ResponseManager(Logged):
192
    """Manage the http request and handle the response data, headers, etc."""
193

    
194
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
195
        """
196
        :param request: (RequestManager)
197

198
        :param poolsize: (int) the size of the connection pool
199

200
        :param connection_retry_limit: (int)
201
        """
202
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203
        self.request = request
204
        self._request_performed = False
205
        self.poolsize = poolsize
206

    
207
    def _get_response(self):
208
        if self._request_performed:
209
            return
210

    
211
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
213
            try:
214
                with PooledHTTPConnection(
215
                        self.request.netloc, self.request.scheme,
216
                        **pool_kw) as connection:
217
                    self.request.LOG_TOKEN = self.LOG_TOKEN
218
                    self.request.LOG_DATA = self.LOG_DATA
219
                    self.request.LOG_PID = self.LOG_PID
220
                    r = self.request.perform(connection)
221
                    plog = ''
222
                    if self.LOG_PID:
223
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224
                            self, r, self.request))
225
                        plog = '\t[%s]' % self
226
                    self._request_performed = True
227
                    self._status_code, self._status = r.status, unquote(
228
                        r.reason)
229
                    recvlog.info(
230
                        '%d %s%s' % (
231
                            self.status_code, self.status, plog))
232
                    self._headers = dict()
233
                    for k, v in r.getheaders():
234
                        if k.lower in ('x-auth-token', ) and (
235
                                not self.LOG_TOKEN):
236
                            self._token, v = v, '...'
237
                        v = unquote(v)
238
                        self._headers[k] = v
239
                        recvlog.info('  %s: %s%s' % (k, v, plog))
240
                    self._content = r.read()
241
                    recvlog.info('data size: %s%s' % (
242
                        len(self._content) if self._content else 0, plog))
243
                    if self.LOG_DATA and self._content:
244
                        data = '%s%s' % (self._content, plog)
245
                        if self._token:
246
                            data = data.replace(self._token, '...')
247
                        sendlog.info(data)
248
                    sendlog.info('-             -        -     -   -  - -')
249
                break
250
            except Exception as err:
251
                if isinstance(err, HTTPException):
252
                    if retries >= self.CONNECTION_TRY_LIMIT:
253
                        raise ClientError(
254
                            'Connection to %s failed %s times (%s: %s )' % (
255
                                self.request.url, retries, type(err), err))
256
                else:
257
                    from traceback import format_stack
258
                    recvlog.debug(
259
                        '\n'.join(['%s' % type(err)] + format_stack()))
260
                    raise ClientError(
261
                        'Failed while http-connecting to %s (%s)' % (
262
                            self.request.url, err))
263

    
264
    @property
265
    def status_code(self):
266
        self._get_response()
267
        return self._status_code
268

    
269
    @property
270
    def status(self):
271
        self._get_response()
272
        return self._status
273

    
274
    @property
275
    def headers(self):
276
        self._get_response()
277
        return self._headers
278

    
279
    @property
280
    def content(self):
281
        self._get_response()
282
        return self._content
283

    
284
    @property
285
    def text(self):
286
        """
287
        :returns: (str) content
288
        """
289
        self._get_response()
290
        return '%s' % self._content
291

    
292
    @property
293
    def json(self):
294
        """
295
        :returns: (dict) squeezed from json-formated content
296
        """
297
        self._get_response()
298
        try:
299
            return loads(self._content)
300
        except ValueError as err:
301
            raise ClientError('Response not formated in JSON - %s' % err)
302

    
303

    
304
class SilentEvent(Thread):
305
    """Thread-run method(*args, **kwargs)"""
306
    def __init__(self, method, *args, **kwargs):
307
        super(self.__class__, self).__init__()
308
        self.method = method
309
        self.args = args
310
        self.kwargs = kwargs
311

    
312
    @property
313
    def exception(self):
314
        return getattr(self, '_exception', False)
315

    
316
    @property
317
    def value(self):
318
        return getattr(self, '_value', None)
319

    
320
    def run(self):
321
        try:
322
            self._value = self.method(*(self.args), **(self.kwargs))
323
        except Exception as e:
324
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
325
                self,
326
                type(e),
327
                e.status if isinstance(e, ClientError) else '',
328
                e))
329
            self._exception = e
330

    
331

    
332
class Client(Logged):
333

    
334
    MAX_THREADS = 7
335
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
336
    CONNECTION_RETRY_LIMIT = 0
337

    
338
    def __init__(self, base_url, token):
339
        assert base_url, 'No base_url for client %s' % self
340
        self.base_url = base_url
341
        self.token = token
342
        self.headers, self.params = dict(), dict()
343

    
344
    def _init_thread_limit(self, limit=1):
345
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
346
        self._thread_limit = limit
347
        self._elapsed_old = 0.0
348
        self._elapsed_new = 0.0
349

    
350
    def _watch_thread_limit(self, threadlist):
351
        self._thread_limit = getattr(self, '_thread_limit', 1)
352
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
353
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
354
        recvlog.debug('# running threads: %s' % len(threadlist))
355
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
356
                self._thread_limit < self.MAX_THREADS):
357
            self._thread_limit += 1
358
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
359
            self._thread_limit -= 1
360

    
361
        self._elapsed_old = self._elapsed_new
362
        if len(threadlist) >= self._thread_limit:
363
            self._elapsed_new = 0.0
364
            for thread in threadlist:
365
                begin_time = time()
366
                thread.join()
367
                self._elapsed_new += time() - begin_time
368
            self._elapsed_new = self._elapsed_new / len(threadlist)
369
            return []
370
        return threadlist
371

    
372
    def async_run(self, method, kwarg_list):
373
        """Fire threads of operations
374

375
        :param method: the method to run in each thread
376

377
        :param kwarg_list: (list of dicts) the arguments to pass in each method
378
            call
379

380
        :returns: (list) the results of each method call w.r. to the order of
381
            kwarg_list
382
        """
383
        flying, results = {}, {}
384
        self._init_thread_limit()
385
        for index, kwargs in enumerate(kwarg_list):
386
            self._watch_thread_limit(flying.values())
387
            flying[index] = SilentEvent(method=method, **kwargs)
388
            flying[index].start()
389
            unfinished = {}
390
            for key, thread in flying.items():
391
                if thread.isAlive():
392
                    unfinished[key] = thread
393
                elif thread.exception:
394
                    print 'HERE IS AN EXCEPTION MK?'
395
                    raise thread.exception
396
                else:
397
                    results[key] = thread.value
398
                print 'NO EXCEPTION', thread.value
399
            flying = unfinished
400
        sendlog.info('- - - wait for threads to finish')
401
        for key, thread in flying.items():
402
            if thread.isAlive():
403
                thread.join()
404
            elif thread.exception:
405
                print 'HERE IS AN EXCEPTION MK-2?'
406
                raise thread.exception
407
            results[key] = thread.value
408
            print 'NO EXCEPTION-2', thread.value
409
        return results.values()
410

    
411
    def _raise_for_status(self, r):
412
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
413
        status_msg = getattr(r, 'status', None) or ''
414
        try:
415
            message = '%s %s\n' % (status_msg, r.text)
416
        except:
417
            message = '%s %s\n' % (status_msg, r)
418
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
419
        raise ClientError(message, status=status)
420

    
421
    def set_header(self, name, value, iff=True):
422
        """Set a header 'name':'value'"""
423
        if value is not None and iff:
424
            self.headers[name] = unicode(value)
425

    
426
    def set_param(self, name, value=None, iff=True):
427
        if iff:
428
            self.params[name] = unicode(value)
429

    
430
    def request(
431
            self, method, path,
432
            async_headers=dict(), async_params=dict(),
433
            **kwargs):
434
        """Commit an HTTP request to base_url/path
435
        Requests are commited to and performed by Request/ResponseManager
436
        These classes perform a lazy http request. Present method, by default,
437
        enforces them to perform the http call. Hint: call present method with
438
        success=None to get a non-performed ResponseManager object.
439
        """
440
        assert isinstance(method, str) or isinstance(method, unicode)
441
        assert method
442
        assert isinstance(path, str) or isinstance(path, unicode)
443
        try:
444
            headers = dict(self.headers)
445
            headers.update(async_headers)
446
            params = dict(self.params)
447
            params.update(async_params)
448
            success = kwargs.pop('success', 200)
449
            data = kwargs.pop('data', None)
450
            headers.setdefault('X-Auth-Token', self.token)
451
            if 'json' in kwargs:
452
                data = dumps(kwargs.pop('json'))
453
                headers.setdefault('Content-Type', 'application/json')
454
            if data:
455
                headers.setdefault('Content-Length', '%s' % len(data))
456

    
457
            plog = '\t[%s]' if self.LOG_PID else ''
458
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
459
            req = RequestManager(
460
                method, self.base_url, path,
461
                data=data, headers=headers, params=params)
462
            #  req.log()
463
            r = ResponseManager(
464
                req, connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
465
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
466
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
467
            r._token = headers['X-Auth-Token']
468
        finally:
469
            self.headers = dict()
470
            self.params = dict()
471

    
472
        if success is not None:
473
            # Success can either be an int or a collection
474
            success = (success,) if isinstance(success, int) else success
475
            if r.status_code not in success:
476
                self._raise_for_status(r)
477
        return r
478

    
479
    def delete(self, path, **kwargs):
480
        return self.request('delete', path, **kwargs)
481

    
482
    def get(self, path, **kwargs):
483
        return self.request('get', path, **kwargs)
484

    
485
    def head(self, path, **kwargs):
486
        return self.request('head', path, **kwargs)
487

    
488
    def post(self, path, **kwargs):
489
        return self.request('post', path, **kwargs)
490

    
491
    def put(self, path, **kwargs):
492
        return self.request('put', path, **kwargs)
493

    
494
    def copy(self, path, **kwargs):
495
        return self.request('copy', path, **kwargs)
496

    
497
    def move(self, path, **kwargs):
498
        return self.request('move', path, **kwargs)