Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 2005b18e

History | View | Annotate | Download (9.3 kB)

1 43ca98ee Giorgos Verigakis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 a1c50326 Giorgos Verigakis
#
3 a1c50326 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 a1c50326 Giorgos Verigakis
# without modification, are permitted provided that the following
5 a1c50326 Giorgos Verigakis
# conditions are met:
6 a1c50326 Giorgos Verigakis
#
7 a1c50326 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
9 a1c50326 Giorgos Verigakis
#      disclaimer.
10 a1c50326 Giorgos Verigakis
#
11 a1c50326 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
13 a1c50326 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 a1c50326 Giorgos Verigakis
#      provided with the distribution.
15 a1c50326 Giorgos Verigakis
#
16 a1c50326 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 a1c50326 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 a1c50326 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 a1c50326 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 a1c50326 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 a1c50326 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 a1c50326 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 a1c50326 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 a1c50326 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 a1c50326 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 a1c50326 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 a1c50326 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 a1c50326 Giorgos Verigakis
#
29 a1c50326 Giorgos Verigakis
# The views and conclusions contained in the software and
30 a1c50326 Giorgos Verigakis
# documentation are those of the authors and should not be
31 a1c50326 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 a1c50326 Giorgos Verigakis
# or implied, of GRNET S.A.
33 a1c50326 Giorgos Verigakis
34 2b74ab4a Stavros Sachtouris
from threading import Thread
35 de4f08ef Stavros Sachtouris
from json import dumps, loads
36 cad39033 Stavros Sachtouris
from time import time
37 6a0b1658 Giorgos Verigakis
import logging
38 c270fe96 Stavros Sachtouris
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
39 1f417830 Stavros Sachtouris
from kamaki.clients.connection.errors import HTTPConnectionError
40 1f417830 Stavros Sachtouris
from kamaki.clients.connection.errors import HTTPResponseError
41 6a0b1658 Giorgos Verigakis
42 6a0b1658 Giorgos Verigakis
sendlog = logging.getLogger('clients.send')
43 1a3c18fd Stavros Sachtouris
datasendlog = logging.getLogger('data.send')
44 6a0b1658 Giorgos Verigakis
recvlog = logging.getLogger('clients.recv')
45 1a3c18fd Stavros Sachtouris
datarecvlog = logging.getLogger('data.recv')
46 6a0b1658 Giorgos Verigakis
47 3dabe5d2 Stavros Sachtouris
48 a1c50326 Giorgos Verigakis
class ClientError(Exception):
49 4f989909 Stavros Sachtouris
    def __init__(self, message, status=0, details=None):
50 de4f08ef Stavros Sachtouris
        try:
51 1f417830 Stavros Sachtouris
            message += '' if message and message[-1] == '\n' else '\n'
52 062b1d0a Stavros Sachtouris
            serv_stat, sep, new_msg = message.partition('{')
53 062b1d0a Stavros Sachtouris
            new_msg = sep + new_msg
54 062b1d0a Stavros Sachtouris
            json_msg = loads(new_msg)
55 de4f08ef Stavros Sachtouris
            key = json_msg.keys()[0]
56 062b1d0a Stavros Sachtouris
57 de4f08ef Stavros Sachtouris
            json_msg = json_msg[key]
58 062b1d0a Stavros Sachtouris
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
59 062b1d0a Stavros Sachtouris
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
60 de4f08ef Stavros Sachtouris
            if 'code' in json_msg:
61 de4f08ef Stavros Sachtouris
                status = json_msg['code']
62 de4f08ef Stavros Sachtouris
            if 'details' in json_msg:
63 de4f08ef Stavros Sachtouris
                if not details:
64 de4f08ef Stavros Sachtouris
                    details = []
65 de4f08ef Stavros Sachtouris
                elif not isinstance(details, list):
66 de4f08ef Stavros Sachtouris
                    details = [details]
67 de4f08ef Stavros Sachtouris
                if json_msg['details']:
68 de4f08ef Stavros Sachtouris
                    details.append(json_msg['details'])
69 de4f08ef Stavros Sachtouris
        except:
70 de4f08ef Stavros Sachtouris
            pass
71 de4f08ef Stavros Sachtouris
72 7e5415a4 Stavros Sachtouris
        super(ClientError, self).__init__(message)
73 a1c50326 Giorgos Verigakis
        self.status = status
74 4f989909 Stavros Sachtouris
        self.details = details if details else []
75 a1c50326 Giorgos Verigakis
76 3dabe5d2 Stavros Sachtouris
77 2b74ab4a Stavros Sachtouris
class SilentEvent(Thread):
78 2b74ab4a Stavros Sachtouris
    """ Thread-run method(*args, **kwargs)
79 2b74ab4a Stavros Sachtouris
        put exception in exception_bucket
80 2b74ab4a Stavros Sachtouris
    """
81 2b74ab4a Stavros Sachtouris
    def __init__(self, method, *args, **kwargs):
82 2b74ab4a Stavros Sachtouris
        super(self.__class__, self).__init__()
83 2b74ab4a Stavros Sachtouris
        self.method = method
84 2b74ab4a Stavros Sachtouris
        self.args = args
85 2b74ab4a Stavros Sachtouris
        self.kwargs = kwargs
86 2b74ab4a Stavros Sachtouris
87 2b74ab4a Stavros Sachtouris
    @property
88 2b74ab4a Stavros Sachtouris
    def exception(self):
89 2b74ab4a Stavros Sachtouris
        return getattr(self, '_exception', False)
90 2b74ab4a Stavros Sachtouris
91 2b74ab4a Stavros Sachtouris
    @property
92 2b74ab4a Stavros Sachtouris
    def value(self):
93 2b74ab4a Stavros Sachtouris
        return getattr(self, '_value', None)
94 2b74ab4a Stavros Sachtouris
95 2b74ab4a Stavros Sachtouris
    def run(self):
96 2b74ab4a Stavros Sachtouris
        try:
97 2b74ab4a Stavros Sachtouris
            self._value = self.method(*(self.args), **(self.kwargs))
98 2b74ab4a Stavros Sachtouris
        except Exception as e:
99 7644c38e Stavros Sachtouris
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
100 7644c38e Stavros Sachtouris
                self,
101 7644c38e Stavros Sachtouris
                type(e),
102 7644c38e Stavros Sachtouris
                e.status if isinstance(e, ClientError) else '',
103 7644c38e Stavros Sachtouris
                e))
104 2b74ab4a Stavros Sachtouris
            self._exception = e
105 2b74ab4a Stavros Sachtouris
106 6069b53b Stavros Sachtouris
107 6a0b1658 Giorgos Verigakis
class Client(object):
108 cccff590 Stavros Sachtouris
    POOL_SIZE = 7
109 cad39033 Stavros Sachtouris
110 5b263ba2 Stavros Sachtouris
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
111 6a0b1658 Giorgos Verigakis
        self.base_url = base_url
112 6c62a96d Giorgos Verigakis
        self.token = token
113 e742badc Stavros Sachtouris
        self.headers = {}
114 de73876b Stavros Sachtouris
        self.DATE_FORMATS = [
115 de73876b Stavros Sachtouris
            '%a %b %d %H:%M:%S %Y',
116 de73876b Stavros Sachtouris
            '%A, %d-%b-%y %H:%M:%S GMT',
117 de73876b Stavros Sachtouris
            '%a, %d %b %Y %H:%M:%S GMT']
118 a2e8e549 Stavros Sachtouris
        self.http_client = http_client
119 6c62a96d Giorgos Verigakis
120 cad39033 Stavros Sachtouris
    def _init_thread_limit(self, limit=1):
121 cad39033 Stavros Sachtouris
        self._thread_limit = limit
122 cad39033 Stavros Sachtouris
        self._elapsed_old = 0.0
123 cad39033 Stavros Sachtouris
        self._elapsed_new = 0.0
124 cad39033 Stavros Sachtouris
125 cad39033 Stavros Sachtouris
    def _watch_thread_limit(self, threadlist):
126 16c895db Stavros Sachtouris
        recvlog.debug('# running threads: %s' % len(threadlist))
127 24ff0a35 Stavros Sachtouris
        if (self._elapsed_old > self._elapsed_new) and (
128 2005b18e Stavros Sachtouris
                self._thread_limit < self.POOL_SIZE):
129 2005b18e Stavros Sachtouris
            self._thread_limit += 1
130 cad39033 Stavros Sachtouris
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
131 cad39033 Stavros Sachtouris
            self._thread_limit -= 1
132 cad39033 Stavros Sachtouris
133 cad39033 Stavros Sachtouris
        self._elapsed_old = self._elapsed_new
134 cad39033 Stavros Sachtouris
        if len(threadlist) >= self._thread_limit:
135 cad39033 Stavros Sachtouris
            self._elapsed_new = 0.0
136 cad39033 Stavros Sachtouris
            for thread in threadlist:
137 cad39033 Stavros Sachtouris
                begin_time = time()
138 cad39033 Stavros Sachtouris
                thread.join()
139 cad39033 Stavros Sachtouris
                self._elapsed_new += time() - begin_time
140 cad39033 Stavros Sachtouris
            self._elapsed_new = self._elapsed_new / len(threadlist)
141 cad39033 Stavros Sachtouris
            return []
142 cad39033 Stavros Sachtouris
        return threadlist
143 cad39033 Stavros Sachtouris
144 6ad245d5 Stavros Sachtouris
    def _raise_for_status(self, r):
145 7966ffb8 Stavros Sachtouris
        status_msg = getattr(r, 'status', '')
146 d804de82 Stavros Sachtouris
        try:
147 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r.text)
148 d804de82 Stavros Sachtouris
        except:
149 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r)
150 7966ffb8 Stavros Sachtouris
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
151 7966ffb8 Stavros Sachtouris
        raise ClientError(message, status=status)
152 6a0b1658 Giorgos Verigakis
153 4adfa919 Stavros Sachtouris
    def set_header(self, name, value, iff=True):
154 3dabe5d2 Stavros Sachtouris
        """Set a header 'name':'value'"""
155 4adfa919 Stavros Sachtouris
        if value is not None and iff:
156 2f749e6e Stavros Sachtouris
            self.http_client.set_header(name, value)
157 2f749e6e Stavros Sachtouris
158 2f749e6e Stavros Sachtouris
    def set_param(self, name, value=None, iff=True):
159 2f749e6e Stavros Sachtouris
        if iff:
160 2f749e6e Stavros Sachtouris
            self.http_client.set_param(name, value)
161 2f749e6e Stavros Sachtouris
162 2f749e6e Stavros Sachtouris
    def set_default_header(self, name, value):
163 2f749e6e Stavros Sachtouris
        self.http_client.headers.setdefault(name, value)
164 e742badc Stavros Sachtouris
165 de73876b Stavros Sachtouris
    def request(
166 24ff0a35 Stavros Sachtouris
            self,
167 24ff0a35 Stavros Sachtouris
            method,
168 24ff0a35 Stavros Sachtouris
            path,
169 24ff0a35 Stavros Sachtouris
            async_headers={},
170 24ff0a35 Stavros Sachtouris
            async_params={},
171 24ff0a35 Stavros Sachtouris
            **kwargs):
172 9a7efb0d Stavros Sachtouris
        """In threaded/asynchronous requests, headers and params are not safe
173 3dabe5d2 Stavros Sachtouris
        Therefore, the standard self.set_header/param system can be used only
174 3dabe5d2 Stavros Sachtouris
        for headers and params that are common for all requests. All other
175 3dabe5d2 Stavros Sachtouris
        params and headers should passes as
176 9a7efb0d Stavros Sachtouris
        @param async_headers
177 9a7efb0d Stavros Sachtouris
        @async_params
178 3dabe5d2 Stavros Sachtouris
        E.g. in most queries the 'X-Auth-Token' header might be the same for
179 3dabe5d2 Stavros Sachtouris
        all, but the 'Range' header might be different from request to request.
180 9a7efb0d Stavros Sachtouris
        """
181 d726b3d0 Stavros Sachtouris
        try:
182 2f749e6e Stavros Sachtouris
            success = kwargs.pop('success', 200)
183 2f749e6e Stavros Sachtouris
184 2f749e6e Stavros Sachtouris
            data = kwargs.pop('data', None)
185 2f749e6e Stavros Sachtouris
            self.set_default_header('X-Auth-Token', self.token)
186 2f749e6e Stavros Sachtouris
187 2f749e6e Stavros Sachtouris
            if 'json' in kwargs:
188 de4f08ef Stavros Sachtouris
                data = dumps(kwargs.pop('json'))
189 2f749e6e Stavros Sachtouris
                self.set_default_header('Content-Type', 'application/json')
190 2f749e6e Stavros Sachtouris
            if data:
191 2f749e6e Stavros Sachtouris
                self.set_default_header('Content-Length', unicode(len(data)))
192 2f749e6e Stavros Sachtouris
193 a40e152f Stavros Sachtouris
            sendlog.info('perform a %s @ %s', method, self.base_url)
194 a40e152f Stavros Sachtouris
195 7d91734c Stavros Sachtouris
            self.http_client.url = self.base_url
196 7d91734c Stavros Sachtouris
            self.http_client.path = path
197 de73876b Stavros Sachtouris
            r = self.http_client.perform_request(
198 de73876b Stavros Sachtouris
                method,
199 3dabe5d2 Stavros Sachtouris
                data,
200 3dabe5d2 Stavros Sachtouris
                async_headers,
201 3dabe5d2 Stavros Sachtouris
                async_params)
202 2f749e6e Stavros Sachtouris
203 2f749e6e Stavros Sachtouris
            req = self.http_client
204 5b263ba2 Stavros Sachtouris
            sendlog.info('%s %s', method, req.url)
205 1785ad41 Stavros Sachtouris
            headers = dict(req.headers)
206 1785ad41 Stavros Sachtouris
            headers.update(async_headers)
207 1785ad41 Stavros Sachtouris
208 1785ad41 Stavros Sachtouris
            for key, val in headers.items():
209 1785ad41 Stavros Sachtouris
                sendlog.info('\t%s: %s', key, val)
210 2f749e6e Stavros Sachtouris
            sendlog.info('')
211 2f749e6e Stavros Sachtouris
            if data:
212 1a3c18fd Stavros Sachtouris
                datasendlog.info(data)
213 2f749e6e Stavros Sachtouris
214 2f749e6e Stavros Sachtouris
            recvlog.info('%d %s', r.status_code, r.status)
215 2f749e6e Stavros Sachtouris
            for key, val in r.headers.items():
216 2f749e6e Stavros Sachtouris
                recvlog.info('%s: %s', key, val)
217 bcb51856 Stavros Sachtouris
            if r.content:
218 1a3c18fd Stavros Sachtouris
                datarecvlog.info(r.content)
219 2f749e6e Stavros Sachtouris
220 1f417830 Stavros Sachtouris
        except (HTTPResponseError, HTTPConnectionError) as err:
221 1f417830 Stavros Sachtouris
            from traceback import format_stack
222 1f417830 Stavros Sachtouris
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
223 2f749e6e Stavros Sachtouris
            self.http_client.reset_headers()
224 2f749e6e Stavros Sachtouris
            self.http_client.reset_params()
225 1f417830 Stavros Sachtouris
            errstr = '%s' % err
226 1f417830 Stavros Sachtouris
            if not errstr:
227 1f417830 Stavros Sachtouris
                errstr = ('%s' % type(err))[7:-2]
228 de73876b Stavros Sachtouris
            status = getattr(err, 'status', getattr(err, 'errno', 0))
229 de73876b Stavros Sachtouris
            raise ClientError('%s\n' % errstr, status=status)
230 a52d2256 Stavros Sachtouris
231 a52d2256 Stavros Sachtouris
        self.http_client.reset_headers()
232 a52d2256 Stavros Sachtouris
        self.http_client.reset_params()
233 0238c167 Stavros Sachtouris
234 0238c167 Stavros Sachtouris
        if success is not None:
235 0238c167 Stavros Sachtouris
            # Success can either be an in or a collection
236 0238c167 Stavros Sachtouris
            success = (success,) if isinstance(success, int) else success
237 0238c167 Stavros Sachtouris
            if r.status_code not in success:
238 0238c167 Stavros Sachtouris
                r.release()
239 0238c167 Stavros Sachtouris
                self._raise_for_status(r)
240 d726b3d0 Stavros Sachtouris
        return r
241 a1c50326 Giorgos Verigakis
242 6a0b1658 Giorgos Verigakis
    def delete(self, path, **kwargs):
243 6a0b1658 Giorgos Verigakis
        return self.request('delete', path, **kwargs)
244 6a0b1658 Giorgos Verigakis
245 6a0b1658 Giorgos Verigakis
    def get(self, path, **kwargs):
246 6a0b1658 Giorgos Verigakis
        return self.request('get', path, **kwargs)
247 6a0b1658 Giorgos Verigakis
248 6a0b1658 Giorgos Verigakis
    def head(self, path, **kwargs):
249 6a0b1658 Giorgos Verigakis
        return self.request('head', path, **kwargs)
250 6a0b1658 Giorgos Verigakis
251 6a0b1658 Giorgos Verigakis
    def post(self, path, **kwargs):
252 6a0b1658 Giorgos Verigakis
        return self.request('post', path, **kwargs)
253 6a0b1658 Giorgos Verigakis
254 6a0b1658 Giorgos Verigakis
    def put(self, path, **kwargs):
255 6a0b1658 Giorgos Verigakis
        return self.request('put', path, **kwargs)
256 6a0b1658 Giorgos Verigakis
257 4adfa919 Stavros Sachtouris
    def copy(self, path, **kwargs):
258 4adfa919 Stavros Sachtouris
        return self.request('copy', path, **kwargs)
259 4adfa919 Stavros Sachtouris
260 4adfa919 Stavros Sachtouris
    def move(self, path, **kwargs):
261 4adfa919 Stavros Sachtouris
        return self.request('move', path, **kwargs)