Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 9c6c3d69

History | View | Annotate | Download (9.7 kB)

1 43ca98ee Giorgos Verigakis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 a1c50326 Giorgos Verigakis
#
3 a1c50326 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 a1c50326 Giorgos Verigakis
# without modification, are permitted provided that the following
5 a1c50326 Giorgos Verigakis
# conditions are met:
6 a1c50326 Giorgos Verigakis
#
7 a1c50326 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
9 a1c50326 Giorgos Verigakis
#      disclaimer.
10 a1c50326 Giorgos Verigakis
#
11 a1c50326 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
13 a1c50326 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 a1c50326 Giorgos Verigakis
#      provided with the distribution.
15 a1c50326 Giorgos Verigakis
#
16 a1c50326 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 a1c50326 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 a1c50326 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 a1c50326 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 a1c50326 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 a1c50326 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 a1c50326 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 a1c50326 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 a1c50326 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 a1c50326 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 a1c50326 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 a1c50326 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 a1c50326 Giorgos Verigakis
#
29 a1c50326 Giorgos Verigakis
# The views and conclusions contained in the software and
30 a1c50326 Giorgos Verigakis
# documentation are those of the authors and should not be
31 a1c50326 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 a1c50326 Giorgos Verigakis
# or implied, of GRNET S.A.
33 a1c50326 Giorgos Verigakis
34 fce31e83 Stavros Sachtouris
from urllib2 import quote
35 2b74ab4a Stavros Sachtouris
from threading import Thread
36 de4f08ef Stavros Sachtouris
from json import dumps, loads
37 cad39033 Stavros Sachtouris
from time import time
38 6a0b1658 Giorgos Verigakis
import logging
39 c270fe96 Stavros Sachtouris
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40 b2c5b650 Stavros Sachtouris
from kamaki.clients.connection.errors import KamakiConnectionError
41 b2c5b650 Stavros Sachtouris
from kamaki.clients.connection.errors import KamakiResponseError
42 6a0b1658 Giorgos Verigakis
43 6a0b1658 Giorgos Verigakis
sendlog = logging.getLogger('clients.send')
44 1a3c18fd Stavros Sachtouris
datasendlog = logging.getLogger('data.send')
45 6a0b1658 Giorgos Verigakis
recvlog = logging.getLogger('clients.recv')
46 1a3c18fd Stavros Sachtouris
datarecvlog = logging.getLogger('data.recv')
47 6a0b1658 Giorgos Verigakis
48 3dabe5d2 Stavros Sachtouris
49 a1c50326 Giorgos Verigakis
class ClientError(Exception):
50 4f989909 Stavros Sachtouris
    def __init__(self, message, status=0, details=None):
51 de4f08ef Stavros Sachtouris
        try:
52 1f417830 Stavros Sachtouris
            message += '' if message and message[-1] == '\n' else '\n'
53 062b1d0a Stavros Sachtouris
            serv_stat, sep, new_msg = message.partition('{')
54 f4de4c91 Stavros Sachtouris
            new_msg = sep + new_msg[:-1 if new_msg.endswith('\n') else 0]
55 062b1d0a Stavros Sachtouris
            json_msg = loads(new_msg)
56 de4f08ef Stavros Sachtouris
            key = json_msg.keys()[0]
57 f4de4c91 Stavros Sachtouris
            serv_stat = serv_stat.strip()
58 062b1d0a Stavros Sachtouris
59 de4f08ef Stavros Sachtouris
            json_msg = json_msg[key]
60 f4de4c91 Stavros Sachtouris
            message = '%s %s (%s)\n' % (
61 f4de4c91 Stavros Sachtouris
                serv_stat,
62 f4de4c91 Stavros Sachtouris
                key,
63 f4de4c91 Stavros Sachtouris
                json_msg['message']) if (
64 f4de4c91 Stavros Sachtouris
                    'message' in json_msg) else '%s %s' % (serv_stat, key)
65 f4de4c91 Stavros Sachtouris
            status = json_msg.get('code', status)
66 de4f08ef Stavros Sachtouris
            if 'details' in json_msg:
67 de4f08ef Stavros Sachtouris
                if not details:
68 de4f08ef Stavros Sachtouris
                    details = []
69 f4de4c91 Stavros Sachtouris
                if not isinstance(details, list):
70 de4f08ef Stavros Sachtouris
                    details = [details]
71 de4f08ef Stavros Sachtouris
                if json_msg['details']:
72 de4f08ef Stavros Sachtouris
                    details.append(json_msg['details'])
73 f4de4c91 Stavros Sachtouris
        except Exception:
74 de4f08ef Stavros Sachtouris
            pass
75 f4de4c91 Stavros Sachtouris
        finally:
76 f4de4c91 Stavros Sachtouris
            super(ClientError, self).__init__(message)
77 f4de4c91 Stavros Sachtouris
            self.status = status if isinstance(status, int) else 0
78 f4de4c91 Stavros Sachtouris
            self.details = details if details else []
79 a1c50326 Giorgos Verigakis
80 3dabe5d2 Stavros Sachtouris
81 2b74ab4a Stavros Sachtouris
class SilentEvent(Thread):
82 b2c5b650 Stavros Sachtouris
    """ Thread-run method(*args, **kwargs)"""
83 2b74ab4a Stavros Sachtouris
    def __init__(self, method, *args, **kwargs):
84 2b74ab4a Stavros Sachtouris
        super(self.__class__, self).__init__()
85 2b74ab4a Stavros Sachtouris
        self.method = method
86 2b74ab4a Stavros Sachtouris
        self.args = args
87 2b74ab4a Stavros Sachtouris
        self.kwargs = kwargs
88 2b74ab4a Stavros Sachtouris
89 2b74ab4a Stavros Sachtouris
    @property
90 2b74ab4a Stavros Sachtouris
    def exception(self):
91 2b74ab4a Stavros Sachtouris
        return getattr(self, '_exception', False)
92 2b74ab4a Stavros Sachtouris
93 2b74ab4a Stavros Sachtouris
    @property
94 2b74ab4a Stavros Sachtouris
    def value(self):
95 2b74ab4a Stavros Sachtouris
        return getattr(self, '_value', None)
96 2b74ab4a Stavros Sachtouris
97 2b74ab4a Stavros Sachtouris
    def run(self):
98 2b74ab4a Stavros Sachtouris
        try:
99 2b74ab4a Stavros Sachtouris
            self._value = self.method(*(self.args), **(self.kwargs))
100 2b74ab4a Stavros Sachtouris
        except Exception as e:
101 7644c38e Stavros Sachtouris
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
102 7644c38e Stavros Sachtouris
                self,
103 7644c38e Stavros Sachtouris
                type(e),
104 7644c38e Stavros Sachtouris
                e.status if isinstance(e, ClientError) else '',
105 7644c38e Stavros Sachtouris
                e))
106 2b74ab4a Stavros Sachtouris
            self._exception = e
107 2b74ab4a Stavros Sachtouris
108 6069b53b Stavros Sachtouris
109 6a0b1658 Giorgos Verigakis
class Client(object):
110 cad39033 Stavros Sachtouris
111 5b263ba2 Stavros Sachtouris
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
112 6a0b1658 Giorgos Verigakis
        self.base_url = base_url
113 6c62a96d Giorgos Verigakis
        self.token = token
114 e742badc Stavros Sachtouris
        self.headers = {}
115 de73876b Stavros Sachtouris
        self.DATE_FORMATS = [
116 de73876b Stavros Sachtouris
            '%a %b %d %H:%M:%S %Y',
117 de73876b Stavros Sachtouris
            '%A, %d-%b-%y %H:%M:%S GMT',
118 de73876b Stavros Sachtouris
            '%a, %d %b %Y %H:%M:%S GMT']
119 a2e8e549 Stavros Sachtouris
        self.http_client = http_client
120 9c6c3d69 Stavros Sachtouris
        self.MAX_THREADS = 7
121 6c62a96d Giorgos Verigakis
122 cad39033 Stavros Sachtouris
    def _init_thread_limit(self, limit=1):
123 9c6c3d69 Stavros Sachtouris
        assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
124 cad39033 Stavros Sachtouris
        self._thread_limit = limit
125 cad39033 Stavros Sachtouris
        self._elapsed_old = 0.0
126 cad39033 Stavros Sachtouris
        self._elapsed_new = 0.0
127 cad39033 Stavros Sachtouris
128 cad39033 Stavros Sachtouris
    def _watch_thread_limit(self, threadlist):
129 9c6c3d69 Stavros Sachtouris
        self._thread_limit = getattr(self, '_thread_limit', 1)
130 9c6c3d69 Stavros Sachtouris
        self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
131 9c6c3d69 Stavros Sachtouris
        self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
132 16c895db Stavros Sachtouris
        recvlog.debug('# running threads: %s' % len(threadlist))
133 9c6c3d69 Stavros Sachtouris
        if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
134 16b0afe6 Stavros Sachtouris
                self._thread_limit < self.MAX_THREADS):
135 2005b18e Stavros Sachtouris
            self._thread_limit += 1
136 9c6c3d69 Stavros Sachtouris
        elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
137 cad39033 Stavros Sachtouris
            self._thread_limit -= 1
138 cad39033 Stavros Sachtouris
139 cad39033 Stavros Sachtouris
        self._elapsed_old = self._elapsed_new
140 cad39033 Stavros Sachtouris
        if len(threadlist) >= self._thread_limit:
141 cad39033 Stavros Sachtouris
            self._elapsed_new = 0.0
142 cad39033 Stavros Sachtouris
            for thread in threadlist:
143 cad39033 Stavros Sachtouris
                begin_time = time()
144 cad39033 Stavros Sachtouris
                thread.join()
145 cad39033 Stavros Sachtouris
                self._elapsed_new += time() - begin_time
146 cad39033 Stavros Sachtouris
            self._elapsed_new = self._elapsed_new / len(threadlist)
147 cad39033 Stavros Sachtouris
            return []
148 cad39033 Stavros Sachtouris
        return threadlist
149 cad39033 Stavros Sachtouris
150 6ad245d5 Stavros Sachtouris
    def _raise_for_status(self, r):
151 7966ffb8 Stavros Sachtouris
        status_msg = getattr(r, 'status', '')
152 d804de82 Stavros Sachtouris
        try:
153 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r.text)
154 d804de82 Stavros Sachtouris
        except:
155 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r)
156 7966ffb8 Stavros Sachtouris
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
157 7966ffb8 Stavros Sachtouris
        raise ClientError(message, status=status)
158 6a0b1658 Giorgos Verigakis
159 4adfa919 Stavros Sachtouris
    def set_header(self, name, value, iff=True):
160 3dabe5d2 Stavros Sachtouris
        """Set a header 'name':'value'"""
161 4adfa919 Stavros Sachtouris
        if value is not None and iff:
162 2f749e6e Stavros Sachtouris
            self.http_client.set_header(name, value)
163 2f749e6e Stavros Sachtouris
164 2f749e6e Stavros Sachtouris
    def set_param(self, name, value=None, iff=True):
165 2f749e6e Stavros Sachtouris
        if iff:
166 2f749e6e Stavros Sachtouris
            self.http_client.set_param(name, value)
167 2f749e6e Stavros Sachtouris
168 2f749e6e Stavros Sachtouris
    def set_default_header(self, name, value):
169 2f749e6e Stavros Sachtouris
        self.http_client.headers.setdefault(name, value)
170 e742badc Stavros Sachtouris
171 de73876b Stavros Sachtouris
    def request(
172 24ff0a35 Stavros Sachtouris
            self,
173 24ff0a35 Stavros Sachtouris
            method,
174 24ff0a35 Stavros Sachtouris
            path,
175 24ff0a35 Stavros Sachtouris
            async_headers={},
176 24ff0a35 Stavros Sachtouris
            async_params={},
177 24ff0a35 Stavros Sachtouris
            **kwargs):
178 9a7efb0d Stavros Sachtouris
        """In threaded/asynchronous requests, headers and params are not safe
179 3dabe5d2 Stavros Sachtouris
        Therefore, the standard self.set_header/param system can be used only
180 3dabe5d2 Stavros Sachtouris
        for headers and params that are common for all requests. All other
181 3dabe5d2 Stavros Sachtouris
        params and headers should passes as
182 9a7efb0d Stavros Sachtouris
        @param async_headers
183 9a7efb0d Stavros Sachtouris
        @async_params
184 3dabe5d2 Stavros Sachtouris
        E.g. in most queries the 'X-Auth-Token' header might be the same for
185 3dabe5d2 Stavros Sachtouris
        all, but the 'Range' header might be different from request to request.
186 9a7efb0d Stavros Sachtouris
        """
187 d726b3d0 Stavros Sachtouris
        try:
188 2f749e6e Stavros Sachtouris
            success = kwargs.pop('success', 200)
189 2f749e6e Stavros Sachtouris
190 2f749e6e Stavros Sachtouris
            data = kwargs.pop('data', None)
191 2f749e6e Stavros Sachtouris
            self.set_default_header('X-Auth-Token', self.token)
192 2f749e6e Stavros Sachtouris
193 2f749e6e Stavros Sachtouris
            if 'json' in kwargs:
194 de4f08ef Stavros Sachtouris
                data = dumps(kwargs.pop('json'))
195 2f749e6e Stavros Sachtouris
                self.set_default_header('Content-Type', 'application/json')
196 2f749e6e Stavros Sachtouris
            if data:
197 a517ff50 Stavros Sachtouris
                self.set_default_header('Content-Length', '%s' % len(data))
198 2f749e6e Stavros Sachtouris
199 a40e152f Stavros Sachtouris
            sendlog.info('perform a %s @ %s', method, self.base_url)
200 a40e152f Stavros Sachtouris
201 c1004a00 Stavros Sachtouris
            self.http_client.url = self.base_url
202 4de960c7 Stavros Sachtouris
            self.http_client.path = quote(path.encode('utf8'))
203 de73876b Stavros Sachtouris
            r = self.http_client.perform_request(
204 de73876b Stavros Sachtouris
                method,
205 3dabe5d2 Stavros Sachtouris
                data,
206 3dabe5d2 Stavros Sachtouris
                async_headers,
207 3dabe5d2 Stavros Sachtouris
                async_params)
208 2f749e6e Stavros Sachtouris
209 2f749e6e Stavros Sachtouris
            req = self.http_client
210 5b263ba2 Stavros Sachtouris
            sendlog.info('%s %s', method, req.url)
211 1785ad41 Stavros Sachtouris
            headers = dict(req.headers)
212 1785ad41 Stavros Sachtouris
            headers.update(async_headers)
213 1785ad41 Stavros Sachtouris
214 1785ad41 Stavros Sachtouris
            for key, val in headers.items():
215 1785ad41 Stavros Sachtouris
                sendlog.info('\t%s: %s', key, val)
216 2f749e6e Stavros Sachtouris
            sendlog.info('')
217 2f749e6e Stavros Sachtouris
            if data:
218 1a3c18fd Stavros Sachtouris
                datasendlog.info(data)
219 2f749e6e Stavros Sachtouris
220 2f749e6e Stavros Sachtouris
            recvlog.info('%d %s', r.status_code, r.status)
221 2f749e6e Stavros Sachtouris
            for key, val in r.headers.items():
222 2f749e6e Stavros Sachtouris
                recvlog.info('%s: %s', key, val)
223 bcb51856 Stavros Sachtouris
            if r.content:
224 1a3c18fd Stavros Sachtouris
                datarecvlog.info(r.content)
225 2f749e6e Stavros Sachtouris
226 b2c5b650 Stavros Sachtouris
        except (KamakiResponseError, KamakiConnectionError) as err:
227 1f417830 Stavros Sachtouris
            from traceback import format_stack
228 1f417830 Stavros Sachtouris
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
229 2f749e6e Stavros Sachtouris
            self.http_client.reset_headers()
230 2f749e6e Stavros Sachtouris
            self.http_client.reset_params()
231 1f417830 Stavros Sachtouris
            errstr = '%s' % err
232 1f417830 Stavros Sachtouris
            if not errstr:
233 1f417830 Stavros Sachtouris
                errstr = ('%s' % type(err))[7:-2]
234 de73876b Stavros Sachtouris
            status = getattr(err, 'status', getattr(err, 'errno', 0))
235 de73876b Stavros Sachtouris
            raise ClientError('%s\n' % errstr, status=status)
236 a52d2256 Stavros Sachtouris
237 a52d2256 Stavros Sachtouris
        self.http_client.reset_headers()
238 a52d2256 Stavros Sachtouris
        self.http_client.reset_params()
239 0238c167 Stavros Sachtouris
240 0238c167 Stavros Sachtouris
        if success is not None:
241 a037fd61 Stavros Sachtouris
            # Success can either be an int or a collection
242 0238c167 Stavros Sachtouris
            success = (success,) if isinstance(success, int) else success
243 0238c167 Stavros Sachtouris
            if r.status_code not in success:
244 0238c167 Stavros Sachtouris
                r.release()
245 0238c167 Stavros Sachtouris
                self._raise_for_status(r)
246 d726b3d0 Stavros Sachtouris
        return r
247 a1c50326 Giorgos Verigakis
248 6a0b1658 Giorgos Verigakis
    def delete(self, path, **kwargs):
249 6a0b1658 Giorgos Verigakis
        return self.request('delete', path, **kwargs)
250 6a0b1658 Giorgos Verigakis
251 6a0b1658 Giorgos Verigakis
    def get(self, path, **kwargs):
252 6a0b1658 Giorgos Verigakis
        return self.request('get', path, **kwargs)
253 6a0b1658 Giorgos Verigakis
254 6a0b1658 Giorgos Verigakis
    def head(self, path, **kwargs):
255 6a0b1658 Giorgos Verigakis
        return self.request('head', path, **kwargs)
256 6a0b1658 Giorgos Verigakis
257 6a0b1658 Giorgos Verigakis
    def post(self, path, **kwargs):
258 6a0b1658 Giorgos Verigakis
        return self.request('post', path, **kwargs)
259 6a0b1658 Giorgos Verigakis
260 6a0b1658 Giorgos Verigakis
    def put(self, path, **kwargs):
261 6a0b1658 Giorgos Verigakis
        return self.request('put', path, **kwargs)
262 6a0b1658 Giorgos Verigakis
263 4adfa919 Stavros Sachtouris
    def copy(self, path, **kwargs):
264 4adfa919 Stavros Sachtouris
        return self.request('copy', path, **kwargs)
265 4adfa919 Stavros Sachtouris
266 4adfa919 Stavros Sachtouris
    def move(self, path, **kwargs):
267 4adfa919 Stavros Sachtouris
        return self.request('move', path, **kwargs)