Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ b8220825

History | View | Annotate | Download (20.5 kB)

1
# Copyright 2011-2014 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103
    LOG_PID = False
104
    _token = None
105

    
106

    
107
class RequestManager(Logged):
108
    """Handle http request information"""
109

    
110
    def _connection_info(self, url, path, params={}):
111
        """ Set self.url to scheme://netloc/?params
112
        :param url: (str or unicode) The service url
113

114
        :param path: (str or unicode) The service path (url/path)
115

116
        :param params: (dict) Parameters to add to final url
117

118
        :returns: (scheme, netloc)
119
        """
120
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
121
        url += '' if url.endswith('/') else '/'
122
        if path:
123
            url += _encode(path[1:] if path.startswith('/') else path)
124
        delim = '?'
125
        for key, val in params.items():
126
            val = quote('' if val in (None, False) else _encode('%s' % val))
127
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128
            delim = '&'
129
        parsed = urlparse(url)
130
        self.url = url
131
        self.path = parsed.path or '/'
132
        if parsed.query:
133
            self.path += '?%s' % parsed.query
134
        return (parsed.scheme, parsed.netloc)
135

    
136
    def __init__(
137
            self, method, url, path,
138
            data=None, headers={}, params={}):
139
        method = method.upper()
140
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
141
        if headers:
142
            assert isinstance(headers, dict)
143
        self.headers = dict(headers)
144
        self.method, self.data = method, data
145
        self.scheme, self.netloc = self._connection_info(url, path, params)
146

    
147
    def dump_log(self):
148
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149
        sendlog.info('- -  -   -     -        -             -')
150
        sendlog.info('%s %s://%s%s%s' % (
151
            self.method, self.scheme, self.netloc, self.path, plog))
152
        for key, val in self.headers.items():
153
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154
                self._token, val = val, '...'
155
            sendlog.info('  %s: %s%s' % (key, val, plog))
156
        if self.data:
157
            sendlog.info('data size: %s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data.replace(self._token, '...') if (
160
                    self._token) else self.data)
161
        else:
162
            sendlog.info('data size: 0%s' % plog)
163

    
164
    def _encode_headers(self):
165
        headers = self.headers
166
        for k, v in self.headers.items():
167
            headers[k] = quote(v)
168
        self.headers = headers
169

    
170
    def perform(self, conn):
171
        """
172
        :param conn: (httplib connection object)
173

174
        :returns: (HTTPResponse)
175
        """
176
        self._encode_headers()
177
        self.dump_log()
178
        conn.request(
179
            method=str(self.method.upper()),
180
            url=str(self.path),
181
            headers=self.headers,
182
            body=self.data)
183
        sendlog.info('')
184
        keep_trying = TIMEOUT
185
        while keep_trying > 0:
186
            try:
187
                return conn.getresponse()
188
            except ResponseNotReady:
189
                wait = 0.03 * random()
190
                sleep(wait)
191
                keep_trying -= wait
192
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
193
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
194
        recvlog.debug(logmsg)
195
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
196

    
197

    
198
class ResponseManager(Logged):
199
    """Manage the http request and handle the response data, headers, etc."""
200

    
201
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
202
        """
203
        :param request: (RequestManager)
204

205
        :param poolsize: (int) the size of the connection pool
206

207
        :param connection_retry_limit: (int)
208
        """
209
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
210
        self.request = request
211
        self._request_performed = False
212
        self.poolsize = poolsize
213

    
214
    def _get_response(self):
215
        if self._request_performed:
216
            return
217

    
218
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
219
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
220
            try:
221
                with PooledHTTPConnection(
222
                        self.request.netloc, self.request.scheme,
223
                        **pool_kw) as connection:
224
                    self.request.LOG_TOKEN = self.LOG_TOKEN
225
                    self.request.LOG_DATA = self.LOG_DATA
226
                    self.request.LOG_PID = self.LOG_PID
227
                    r = self.request.perform(connection)
228
                    plog = ''
229
                    if self.LOG_PID:
230
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
231
                            self, r, self.request))
232
                        plog = '\t[%s]' % self
233
                    self._request_performed = True
234
                    self._status_code, self._status = r.status, unquote(
235
                        r.reason)
236
                    recvlog.info(
237
                        '%d %s%s' % (
238
                            self.status_code, self.status, plog))
239
                    self._headers = dict()
240
                    for k, v in r.getheaders():
241
                        if k.lower in ('x-auth-token', ) and (
242
                                not self.LOG_TOKEN):
243
                            self._token, v = v, '...'
244
                        v = unquote(v).decode('utf-8')
245
                        self._headers[k] = v
246
                        recvlog.info('  %s: %s%s' % (k, v, plog))
247
                    self._content = r.read()
248
                    recvlog.info('data size: %s%s' % (
249
                        len(self._content) if self._content else 0, plog))
250
                    if self.LOG_DATA and self._content:
251
                        data = '%s%s' % (self._content, plog)
252
                        if self._token:
253
                            data = data.replace(self._token, '...')
254
                        recvlog.info(data)
255
                    recvlog.info('-             -        -     -   -  - -')
256
                break
257
            except Exception as err:
258
                if isinstance(err, HTTPException):
259
                    if retries >= self.CONNECTION_TRY_LIMIT:
260
                        raise ClientError(
261
                            'Connection to %s failed %s times (%s: %s )' % (
262
                                self.request.url, retries, type(err), err))
263
                else:
264
                    from traceback import format_stack
265
                    recvlog.debug(
266
                        '\n'.join(['%s' % type(err)] + format_stack()))
267
                    raise
268
                    raise ClientError(
269
                        'Failed while http-connecting to %s (%s)' % (
270
                            self.request.url, err))
271

    
272
    @property
273
    def status_code(self):
274
        self._get_response()
275
        return self._status_code
276

    
277
    @property
278
    def status(self):
279
        self._get_response()
280
        return self._status
281

    
282
    @property
283
    def headers(self):
284
        self._get_response()
285
        return self._headers
286

    
287
    @property
288
    def content(self):
289
        self._get_response()
290
        return self._content
291

    
292
    @property
293
    def text(self):
294
        """
295
        :returns: (str) content
296
        """
297
        self._get_response()
298
        return '%s' % self._content
299

    
300
    @property
301
    def json(self):
302
        """
303
        :returns: (dict) squeezed from json-formated content
304
        """
305
        self._get_response()
306
        try:
307
            return loads(self._content)
308
        except ValueError as err:
309
            raise ClientError('Response not formated in JSON - %s' % err)
310

    
311

    
312
class SilentEvent(Thread):
313
    """Thread-run method(*args, **kwargs)"""
314
    def __init__(self, method, *args, **kwargs):
315
        super(self.__class__, self).__init__()
316
        self.method = method
317
        self.args = args
318
        self.kwargs = kwargs
319

    
320
    @property
321
    def exception(self):
322
        return getattr(self, '_exception', False)
323

    
324
    @property
325
    def value(self):
326
        return getattr(self, '_value', None)
327

    
328
    def run(self):
329
        try:
330
            self._value = self.method(*(self.args), **(self.kwargs))
331
        except Exception as e:
332
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
333
                self,
334
                type(e),
335
                e.status if isinstance(e, ClientError) else '',
336
                e))
337
            self._exception = e
338

    
339

    
340
class Client(Logged):
341

    
342
    MAX_THREADS = 1
343
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
344
    CONNECTION_RETRY_LIMIT = 0
345

    
346
    def __init__(self, base_url, token):
347
        assert base_url, 'No base_url for client %s' % self
348
        self.base_url = base_url
349
        self.token = token
350
        self.headers, self.params = dict(), dict()
351
        self.poolsize = None
352

    
353
    def _init_thread_limit(self, limit=1):
354
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
355
        self._thread_limit = limit
356
        self._elapsed_old = 0.0
357
        self._elapsed_new = 0.0
358

    
359
    def _watch_thread_limit(self, threadlist):
360
        self._thread_limit = getattr(self, '_thread_limit', 1)
361
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
362
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
363
        recvlog.debug('# running threads: %s' % len(threadlist))
364
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
365
                self._thread_limit < self.MAX_THREADS):
366
            self._thread_limit += 1
367
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
368
            self._thread_limit -= 1
369

    
370
        self._elapsed_old = self._elapsed_new
371
        if len(threadlist) >= self._thread_limit:
372
            self._elapsed_new = 0.0
373
            for thread in threadlist:
374
                begin_time = time()
375
                thread.join()
376
                self._elapsed_new += time() - begin_time
377
            self._elapsed_new = self._elapsed_new / len(threadlist)
378
            return []
379
        return threadlist
380

    
381
    def async_run(self, method, kwarg_list):
382
        """Fire threads of operations
383

384
        :param method: the method to run in each thread
385

386
        :param kwarg_list: (list of dicts) the arguments to pass in each method
387
            call
388

389
        :returns: (list) the results of each method call w.r. to the order of
390
            kwarg_list
391
        """
392
        flying, results = {}, {}
393
        self._init_thread_limit()
394
        for index, kwargs in enumerate(kwarg_list):
395
            self._watch_thread_limit(flying.values())
396
            flying[index] = SilentEvent(method=method, **kwargs)
397
            flying[index].start()
398
            unfinished = {}
399
            for key, thread in flying.items():
400
                if thread.isAlive():
401
                    unfinished[key] = thread
402
                elif thread.exception:
403
                    raise thread.exception
404
                else:
405
                    results[key] = thread.value
406
            flying = unfinished
407
        sendlog.info('- - - wait for threads to finish')
408
        for key, thread in flying.items():
409
            if thread.isAlive():
410
                thread.join()
411
            if thread.exception:
412
                raise thread.exception
413
            results[key] = thread.value
414
        return results.values()
415

    
416
    def _raise_for_status(self, r):
417
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
418
        status_msg = getattr(r, 'status', None) or ''
419
        try:
420
            message = '%s %s\n' % (status_msg, r.text)
421
        except:
422
            message = '%s %s\n' % (status_msg, r)
423
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
424
        raise ClientError(message, status=status)
425

    
426
    def set_header(self, name, value, iff=True):
427
        """Set a header 'name':'value'"""
428
        if value is not None and iff:
429
            self.headers['%s' % name] = '%s' % value
430

    
431
    def set_param(self, name, value=None, iff=True):
432
        if iff:
433
            self.params[name] = '%s' % value
434

    
435
    def request(
436
            self, method, path,
437
            async_headers=dict(), async_params=dict(),
438
            **kwargs):
439
        """Commit an HTTP request to base_url/path
440
        Requests are commited to and performed by Request/ResponseManager
441
        These classes perform a lazy http request. Present method, by default,
442
        enforces them to perform the http call. Hint: call present method with
443
        success=None to get a non-performed ResponseManager object.
444
        """
445
        assert isinstance(method, str) or isinstance(method, unicode)
446
        assert method
447
        assert isinstance(path, str) or isinstance(path, unicode)
448
        try:
449
            headers = dict(self.headers)
450
            headers.update(async_headers)
451
            params = dict(self.params)
452
            params.update(async_params)
453
            success = kwargs.pop('success', 200)
454
            data = kwargs.pop('data', None)
455
            headers.setdefault('X-Auth-Token', self.token)
456
            if 'json' in kwargs:
457
                data = dumps(kwargs.pop('json'))
458
                headers.setdefault('Content-Type', 'application/json')
459
            if data:
460
                headers.setdefault('Content-Length', '%s' % len(data))
461

    
462
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
463
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
464
            req = RequestManager(
465
                method, self.base_url, path,
466
                data=data, headers=headers, params=params)
467
            #  req.log()
468
            r = ResponseManager(
469
                req,
470
                poolsize=self.poolsize,
471
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
472
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
473
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
474
            r._token = headers['X-Auth-Token']
475
        finally:
476
            self.headers = dict()
477
            self.params = dict()
478

    
479
        if success is not None:
480
            # Success can either be an int or a collection
481
            success = (success,) if isinstance(success, int) else success
482
            if r.status_code not in success:
483
                self._raise_for_status(r)
484
        return r
485

    
486
    def delete(self, path, **kwargs):
487
        return self.request('delete', path, **kwargs)
488

    
489
    def get(self, path, **kwargs):
490
        return self.request('get', path, **kwargs)
491

    
492
    def head(self, path, **kwargs):
493
        return self.request('head', path, **kwargs)
494

    
495
    def post(self, path, **kwargs):
496
        return self.request('post', path, **kwargs)
497

    
498
    def put(self, path, **kwargs):
499
        return self.request('put', path, **kwargs)
500

    
501
    def copy(self, path, **kwargs):
502
        return self.request('copy', path, **kwargs)
503

    
504
    def move(self, path, **kwargs):
505
        return self.request('move', path, **kwargs)
506

    
507

    
508
class Waiter(object):
509

    
510
    def _wait(
511
            self, item_id, wait_status, get_status,
512
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
513
        """Wait while the item is still in wait_status or to reach it
514

515
        :param server_id: integer (str or int)
516

517
        :param wait_status: (str)
518

519
        :param get_status: (method(self, item_id)) if called, returns
520
            (status, progress %) If no way to tell progress, return None
521

522
        :param delay: time interval between retries
523

524
        :param wait_cb: (method(total steps)) returns a generator for
525
            reporting progress or timeouts i.e., for a progress bar
526

527
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
528

529
        :returns: (str) the new mode if successful, (bool) False if timed out
530
        """
531
        status, progress = get_status(self, item_id)
532

    
533
        if wait_cb:
534
            wait_gen = wait_cb(max_wait // delay)
535
            wait_gen.next()
536

    
537
        if wait_for_status ^ (status != wait_status):
538
            # if wait_cb:
539
            #     try:
540
            #         wait_gen.next()
541
            #     except Exception:
542
            #         pass
543
            return status
544
        old_wait = total_wait = 0
545

    
546
        while (wait_for_status ^ (status == wait_status)) and (
547
                total_wait <= max_wait):
548
            if wait_cb:
549
                try:
550
                    for i in range(total_wait - old_wait):
551
                        wait_gen.next()
552
                except Exception:
553
                    break
554
            old_wait = total_wait
555
            total_wait = progress or total_wait + 1
556
            sleep(delay)
557
            status, progress = get_status(self, item_id)
558

    
559
        if total_wait < max_wait:
560
            if wait_cb:
561
                try:
562
                    for i in range(max_wait):
563
                        wait_gen.next()
564
                except:
565
                    pass
566
        return status if (wait_for_status ^ (status != wait_status)) else False
567

    
568
    def wait_for(
569
            self, item_id, target_status, get_status,
570
            delay=1, max_wait=100, wait_cb=None):
571
        self._wait(
572
            item_id, target_status, get_status, delay, max_wait, wait_cb,
573
            wait_for_status=True)
574

    
575
    def wait_while(
576
            self, item_id, target_status, get_status,
577
            delay=1, max_wait=100, wait_cb=None):
578
        self._wait(
579
            item_id, target_status, get_status, delay, max_wait, wait_cb,
580
            wait_for_status=False)