Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 67377ec3

History | View | Annotate | Download (20.3 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from urllib2 import quote, unquote
35
from urlparse import urlparse
36
from threading import Thread
37
from json import dumps, loads
38
from time import time
39
from httplib import ResponseNotReady, HTTPException
40
from time import sleep
41
from random import random
42
from logging import getLogger
43

    
44
from objpool.http import PooledHTTPConnection
45

    
46

    
47
TIMEOUT = 60.0   # seconds
48
HTTP_METHODS = ['GET', 'POST', 'PUT', 'HEAD', 'DELETE', 'COPY', 'MOVE']
49

    
50
log = getLogger(__name__)
51
sendlog = getLogger('%s.send' % __name__)
52
recvlog = getLogger('%s.recv' % __name__)
53

    
54

    
55
def _encode(v):
56
    if v and isinstance(v, unicode):
57
        return quote(v.encode('utf-8'))
58
    return v
59

    
60

    
61
class ClientError(Exception):
62
    def __init__(self, message, status=0, details=None):
63
        log.debug('ClientError: msg[%s], sts[%s], dtl[%s]' % (
64
            message,
65
            status,
66
            details))
67
        try:
68
            message += '' if message and message[-1] == '\n' else '\n'
69
            serv_stat, sep, new_msg = message.partition('{')
70
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
71
            json_msg = loads(new_msg)
72
            key = json_msg.keys()[0]
73
            serv_stat = serv_stat.strip()
74

    
75
            json_msg = json_msg[key]
76
            message = '%s %s (%s)\n' % (
77
                serv_stat,
78
                key,
79
                json_msg['message']) if (
80
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
81
            status = json_msg.get('code', status)
82
            if 'details' in json_msg:
83
                if not details:
84
                    details = []
85
                if not isinstance(details, list):
86
                    details = [details]
87
                if json_msg['details']:
88
                    details.append(json_msg['details'])
89
        except Exception:
90
            pass
91
        finally:
92
            while message.endswith('\n\n'):
93
                message = message[:-1]
94
            super(ClientError, self).__init__(message)
95
            self.status = status if isinstance(status, int) else 0
96
            self.details = details if details else []
97

    
98

    
99
class Logged(object):
100

    
101
    LOG_TOKEN = False
102
    LOG_DATA = False
103
    LOG_PID = False
104
    _token = None
105

    
106

    
107
class RequestManager(Logged):
108
    """Handle http request information"""
109

    
110
    def _connection_info(self, url, path, params={}):
111
        """ Set self.url to scheme://netloc/?params
112
        :param url: (str or unicode) The service url
113

114
        :param path: (str or unicode) The service path (url/path)
115

116
        :param params: (dict) Parameters to add to final url
117

118
        :returns: (scheme, netloc)
119
        """
120
        url = _encode(str(url)) if url else 'http://127.0.0.1/'
121
        url += '' if url.endswith('/') else '/'
122
        if path:
123
            url += _encode(path[1:] if path.startswith('/') else path)
124
        delim = '?'
125
        for key, val in params.items():
126
            val = quote('' if val in (None, False) else _encode('%s' % val))
127
            url += '%s%s%s' % (delim, key, ('=%s' % val) if val else '')
128
            delim = '&'
129
        parsed = urlparse(url)
130
        self.url = url
131
        self.path = parsed.path or '/'
132
        if parsed.query:
133
            self.path += '?%s' % parsed.query
134
        return (parsed.scheme, parsed.netloc)
135

    
136
    def __init__(
137
            self, method, url, path,
138
            data=None, headers={}, params={}):
139
        method = method.upper()
140
        assert method in HTTP_METHODS, 'Invalid http method %s' % method
141
        if headers:
142
            assert isinstance(headers, dict)
143
        self.headers = dict(headers)
144
        self.method, self.data = method, data
145
        self.scheme, self.netloc = self._connection_info(url, path, params)
146

    
147
    def dump_log(self):
148
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
149
        sendlog.info('- -  -   -     -        -             -')
150
        sendlog.info('%s %s://%s%s%s' % (
151
            self.method, self.scheme, self.netloc, self.path, plog))
152
        for key, val in self.headers.items():
153
            if key.lower() in ('x-auth-token', ) and not self.LOG_TOKEN:
154
                self._token, val = val, '...'
155
            sendlog.info('  %s: %s%s' % (key, val, plog))
156
        if self.data:
157
            sendlog.info('data size: %s%s' % (len(self.data), plog))
158
            if self.LOG_DATA:
159
                sendlog.info(self.data.replace(self._token, '...') if (
160
                    self._token) else self.data)
161
        else:
162
            sendlog.info('data size: 0%s' % plog)
163

    
164
    def perform(self, conn):
165
        """
166
        :param conn: (httplib connection object)
167

168
        :returns: (HTTPResponse)
169
        """
170
        self.dump_log()
171
        conn.request(
172
            method=str(self.method.upper()),
173
            url=str(self.path),
174
            headers=self.headers,
175
            body=self.data)
176
        sendlog.info('')
177
        keep_trying = TIMEOUT
178
        while keep_trying > 0:
179
            try:
180
                return conn.getresponse()
181
            except ResponseNotReady:
182
                wait = 0.03 * random()
183
                sleep(wait)
184
                keep_trying -= wait
185
        plog = ('\t[%s]' % self) if self.LOG_PID else ''
186
        logmsg = 'Kamaki Timeout %s %s%s' % (self.method, self.path, plog)
187
        recvlog.debug(logmsg)
188
        raise ClientError('HTTPResponse takes too long - kamaki timeout')
189

    
190

    
191
class ResponseManager(Logged):
192
    """Manage the http request and handle the response data, headers, etc."""
193

    
194
    def __init__(self, request, poolsize=None, connection_retry_limit=0):
195
        """
196
        :param request: (RequestManager)
197

198
        :param poolsize: (int) the size of the connection pool
199

200
        :param connection_retry_limit: (int)
201
        """
202
        self.CONNECTION_TRY_LIMIT = 1 + connection_retry_limit
203
        self.request = request
204
        self._request_performed = False
205
        self.poolsize = poolsize
206

    
207
    def _get_response(self):
208
        if self._request_performed:
209
            return
210

    
211
        pool_kw = dict(size=self.poolsize) if self.poolsize else dict()
212
        for retries in range(1, self.CONNECTION_TRY_LIMIT + 1):
213
            try:
214
                with PooledHTTPConnection(
215
                        self.request.netloc, self.request.scheme,
216
                        **pool_kw) as connection:
217
                    self.request.LOG_TOKEN = self.LOG_TOKEN
218
                    self.request.LOG_DATA = self.LOG_DATA
219
                    self.request.LOG_PID = self.LOG_PID
220
                    r = self.request.perform(connection)
221
                    plog = ''
222
                    if self.LOG_PID:
223
                        recvlog.info('\n%s <-- %s <-- [req: %s]\n' % (
224
                            self, r, self.request))
225
                        plog = '\t[%s]' % self
226
                    self._request_performed = True
227
                    self._status_code, self._status = r.status, unquote(
228
                        r.reason)
229
                    recvlog.info(
230
                        '%d %s%s' % (
231
                            self.status_code, self.status, plog))
232
                    self._headers = dict()
233
                    for k, v in r.getheaders():
234
                        if k.lower in ('x-auth-token', ) and (
235
                                not self.LOG_TOKEN):
236
                            self._token, v = v, '...'
237
                        v = unquote(v)
238
                        self._headers[k] = v
239
                        recvlog.info('  %s: %s%s' % (k, v, plog))
240
                    self._content = r.read()
241
                    recvlog.info('data size: %s%s' % (
242
                        len(self._content) if self._content else 0, plog))
243
                    if self.LOG_DATA and self._content:
244
                        data = '%s%s' % (self._content, plog)
245
                        if self._token:
246
                            data = data.replace(self._token, '...')
247
                        recvlog.info(data)
248
                    recvlog.info('-             -        -     -   -  - -')
249
                break
250
            except Exception as err:
251
                if isinstance(err, HTTPException):
252
                    if retries >= self.CONNECTION_TRY_LIMIT:
253
                        raise ClientError(
254
                            'Connection to %s failed %s times (%s: %s )' % (
255
                                self.request.url, retries, type(err), err))
256
                else:
257
                    from traceback import format_stack
258
                    recvlog.debug(
259
                        '\n'.join(['%s' % type(err)] + format_stack()))
260
                    raise
261
                    raise ClientError(
262
                        'Failed while http-connecting to %s (%s)' % (
263
                            self.request.url, err))
264

    
265
    @property
266
    def status_code(self):
267
        self._get_response()
268
        return self._status_code
269

    
270
    @property
271
    def status(self):
272
        self._get_response()
273
        return self._status
274

    
275
    @property
276
    def headers(self):
277
        self._get_response()
278
        return self._headers
279

    
280
    @property
281
    def content(self):
282
        self._get_response()
283
        return self._content
284

    
285
    @property
286
    def text(self):
287
        """
288
        :returns: (str) content
289
        """
290
        self._get_response()
291
        return '%s' % self._content
292

    
293
    @property
294
    def json(self):
295
        """
296
        :returns: (dict) squeezed from json-formated content
297
        """
298
        self._get_response()
299
        try:
300
            return loads(self._content)
301
        except ValueError as err:
302
            raise ClientError('Response not formated in JSON - %s' % err)
303

    
304

    
305
class SilentEvent(Thread):
306
    """Thread-run method(*args, **kwargs)"""
307
    def __init__(self, method, *args, **kwargs):
308
        super(self.__class__, self).__init__()
309
        self.method = method
310
        self.args = args
311
        self.kwargs = kwargs
312

    
313
    @property
314
    def exception(self):
315
        return getattr(self, '_exception', False)
316

    
317
    @property
318
    def value(self):
319
        return getattr(self, '_value', None)
320

    
321
    def run(self):
322
        try:
323
            self._value = self.method(*(self.args), **(self.kwargs))
324
        except Exception as e:
325
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
326
                self,
327
                type(e),
328
                e.status if isinstance(e, ClientError) else '',
329
                e))
330
            self._exception = e
331

    
332

    
333
class Client(Logged):
334

    
335
    MAX_THREADS = 1
336
    DATE_FORMATS = ['%a %b %d %H:%M:%S %Y', ]
337
    CONNECTION_RETRY_LIMIT = 0
338

    
339
    def __init__(self, base_url, token):
340
        assert base_url, 'No base_url for client %s' % self
341
        self.base_url = base_url
342
        self.token = token
343
        self.headers, self.params = dict(), dict()
344
        self.poolsize = None
345

    
346
    def _init_thread_limit(self, limit=1):
347
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
348
        self._thread_limit = limit
349
        self._elapsed_old = 0.0
350
        self._elapsed_new = 0.0
351

    
352
    def _watch_thread_limit(self, threadlist):
353
        self._thread_limit = getattr(self, '_thread_limit', 1)
354
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
355
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
356
        recvlog.debug('# running threads: %s' % len(threadlist))
357
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
358
                self._thread_limit < self.MAX_THREADS):
359
            self._thread_limit += 1
360
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
361
            self._thread_limit -= 1
362

    
363
        self._elapsed_old = self._elapsed_new
364
        if len(threadlist) >= self._thread_limit:
365
            self._elapsed_new = 0.0
366
            for thread in threadlist:
367
                begin_time = time()
368
                thread.join()
369
                self._elapsed_new += time() - begin_time
370
            self._elapsed_new = self._elapsed_new / len(threadlist)
371
            return []
372
        return threadlist
373

    
374
    def async_run(self, method, kwarg_list):
375
        """Fire threads of operations
376

377
        :param method: the method to run in each thread
378

379
        :param kwarg_list: (list of dicts) the arguments to pass in each method
380
            call
381

382
        :returns: (list) the results of each method call w.r. to the order of
383
            kwarg_list
384
        """
385
        flying, results = {}, {}
386
        self._init_thread_limit()
387
        for index, kwargs in enumerate(kwarg_list):
388
            self._watch_thread_limit(flying.values())
389
            flying[index] = SilentEvent(method=method, **kwargs)
390
            flying[index].start()
391
            unfinished = {}
392
            for key, thread in flying.items():
393
                if thread.isAlive():
394
                    unfinished[key] = thread
395
                elif thread.exception:
396
                    raise thread.exception
397
                else:
398
                    results[key] = thread.value
399
            flying = unfinished
400
        sendlog.info('- - - wait for threads to finish')
401
        for key, thread in flying.items():
402
            if thread.isAlive():
403
                thread.join()
404
            if thread.exception:
405
                raise thread.exception
406
            results[key] = thread.value
407
        return results.values()
408

    
409
    def _raise_for_status(self, r):
410
        log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
411
        status_msg = getattr(r, 'status', None) or ''
412
        try:
413
            message = '%s %s\n' % (status_msg, r.text)
414
        except:
415
            message = '%s %s\n' % (status_msg, r)
416
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
417
        raise ClientError(message, status=status)
418

    
419
    def set_header(self, name, value, iff=True):
420
        """Set a header 'name':'value'"""
421
        if value is not None and iff:
422
            self.headers['%s' % name] = '%s' % value
423

    
424
    def set_param(self, name, value=None, iff=True):
425
        if iff:
426
            self.params[name] = '%s' % value
427

    
428
    def request(
429
            self, method, path,
430
            async_headers=dict(), async_params=dict(),
431
            **kwargs):
432
        """Commit an HTTP request to base_url/path
433
        Requests are commited to and performed by Request/ResponseManager
434
        These classes perform a lazy http request. Present method, by default,
435
        enforces them to perform the http call. Hint: call present method with
436
        success=None to get a non-performed ResponseManager object.
437
        """
438
        assert isinstance(method, str) or isinstance(method, unicode)
439
        assert method
440
        assert isinstance(path, str) or isinstance(path, unicode)
441
        try:
442
            headers = dict(self.headers)
443
            headers.update(async_headers)
444
            params = dict(self.params)
445
            params.update(async_params)
446
            success = kwargs.pop('success', 200)
447
            data = kwargs.pop('data', None)
448
            headers.setdefault('X-Auth-Token', self.token)
449
            if 'json' in kwargs:
450
                data = dumps(kwargs.pop('json'))
451
                headers.setdefault('Content-Type', 'application/json')
452
            if data:
453
                headers.setdefault('Content-Length', '%s' % len(data))
454

    
455
            plog = ('\t[%s]' % self) if self.LOG_PID else ''
456
            sendlog.debug('\n\nCMT %s@%s%s', method, self.base_url, plog)
457
            req = RequestManager(
458
                method, self.base_url, path,
459
                data=data, headers=headers, params=params)
460
            #  req.log()
461
            r = ResponseManager(
462
                req,
463
                poolsize=self.poolsize,
464
                connection_retry_limit=self.CONNECTION_RETRY_LIMIT)
465
            r.LOG_TOKEN, r.LOG_DATA, r.LOG_PID = (
466
                self.LOG_TOKEN, self.LOG_DATA, self.LOG_PID)
467
            r._token = headers['X-Auth-Token']
468
        finally:
469
            self.headers = dict()
470
            self.params = dict()
471

    
472
        if success is not None:
473
            # Success can either be an int or a collection
474
            success = (success,) if isinstance(success, int) else success
475
            if r.status_code not in success:
476
                self._raise_for_status(r)
477
        return r
478

    
479
    def delete(self, path, **kwargs):
480
        return self.request('delete', path, **kwargs)
481

    
482
    def get(self, path, **kwargs):
483
        return self.request('get', path, **kwargs)
484

    
485
    def head(self, path, **kwargs):
486
        return self.request('head', path, **kwargs)
487

    
488
    def post(self, path, **kwargs):
489
        return self.request('post', path, **kwargs)
490

    
491
    def put(self, path, **kwargs):
492
        return self.request('put', path, **kwargs)
493

    
494
    def copy(self, path, **kwargs):
495
        return self.request('copy', path, **kwargs)
496

    
497
    def move(self, path, **kwargs):
498
        return self.request('move', path, **kwargs)
499

    
500

    
501
class Waiter(object):
502

    
503
    def _wait(
504
            self, item_id, wait_status, get_status,
505
            delay=1, max_wait=100, wait_cb=None, wait_for_status=False):
506
        """Wait while the item is still in wait_status or to reach it
507

508
        :param server_id: integer (str or int)
509

510
        :param wait_status: (str)
511

512
        :param get_status: (method(self, item_id)) if called, returns
513
            (status, progress %) If no way to tell progress, return None
514

515
        :param delay: time interval between retries
516

517
        :param wait_cb: (method(total steps)) returns a generator for
518
            reporting progress or timeouts i.e., for a progress bar
519

520
        :param wait_for_status: (bool) wait FOR (True) or wait WHILE (False)
521

522
        :returns: (str) the new mode if successful, (bool) False if timed out
523
        """
524
        status, progress = get_status(self, item_id)
525

    
526
        if wait_cb:
527
            wait_gen = wait_cb(max_wait // delay)
528
            wait_gen.next()
529

    
530
        if wait_for_status ^ (status != wait_status):
531
            # if wait_cb:
532
            #     try:
533
            #         wait_gen.next()
534
            #     except Exception:
535
            #         pass
536
            return status
537
        old_wait = total_wait = 0
538

    
539
        while (wait_for_status ^ (status == wait_status)) and (
540
                total_wait <= max_wait):
541
            if wait_cb:
542
                try:
543
                    for i in range(total_wait - old_wait):
544
                        wait_gen.next()
545
                except Exception:
546
                    break
547
            old_wait = total_wait
548
            total_wait = progress or total_wait + 1
549
            sleep(delay)
550
            status, progress = get_status(self, item_id)
551

    
552
        if total_wait < max_wait:
553
            if wait_cb:
554
                try:
555
                    for i in range(max_wait):
556
                        wait_gen.next()
557
                except:
558
                    pass
559
        return status if (wait_for_status ^ (status != wait_status)) else False
560

    
561
    def wait_for(
562
            self, item_id, target_status, get_status,
563
            delay=1, max_wait=100, wait_cb=None):
564
        self._wait(
565
            item_id, target_status, get_status, delay, max_wait, wait_cb,
566
            wait_for_status=True)
567

    
568
    def wait_while(
569
            self, item_id, target_status, get_status,
570
            delay=1, max_wait=100, wait_cb=None):
571
        self._wait(
572
            item_id, target_status, get_status, delay, max_wait, wait_cb,
573
            wait_for_status=False)