Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 773d61ca

History | View | Annotate | Download (9.2 kB)

1 43ca98ee Giorgos Verigakis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 a1c50326 Giorgos Verigakis
#
3 a1c50326 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 a1c50326 Giorgos Verigakis
# without modification, are permitted provided that the following
5 a1c50326 Giorgos Verigakis
# conditions are met:
6 a1c50326 Giorgos Verigakis
#
7 a1c50326 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
9 a1c50326 Giorgos Verigakis
#      disclaimer.
10 a1c50326 Giorgos Verigakis
#
11 a1c50326 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
13 a1c50326 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 a1c50326 Giorgos Verigakis
#      provided with the distribution.
15 a1c50326 Giorgos Verigakis
#
16 a1c50326 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 a1c50326 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 a1c50326 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 a1c50326 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 a1c50326 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 a1c50326 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 a1c50326 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 a1c50326 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 a1c50326 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 a1c50326 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 a1c50326 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 a1c50326 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 a1c50326 Giorgos Verigakis
#
29 a1c50326 Giorgos Verigakis
# The views and conclusions contained in the software and
30 a1c50326 Giorgos Verigakis
# documentation are those of the authors and should not be
31 a1c50326 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 a1c50326 Giorgos Verigakis
# or implied, of GRNET S.A.
33 a1c50326 Giorgos Verigakis
34 2b74ab4a Stavros Sachtouris
from threading import Thread
35 de4f08ef Stavros Sachtouris
from json import dumps, loads
36 cad39033 Stavros Sachtouris
from time import time
37 6a0b1658 Giorgos Verigakis
import logging
38 c270fe96 Stavros Sachtouris
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
39 1f417830 Stavros Sachtouris
from kamaki.clients.connection.errors import HTTPConnectionError
40 1f417830 Stavros Sachtouris
from kamaki.clients.connection.errors import HTTPResponseError
41 6a0b1658 Giorgos Verigakis
42 6a0b1658 Giorgos Verigakis
sendlog = logging.getLogger('clients.send')
43 1a3c18fd Stavros Sachtouris
datasendlog = logging.getLogger('data.send')
44 6a0b1658 Giorgos Verigakis
recvlog = logging.getLogger('clients.recv')
45 1a3c18fd Stavros Sachtouris
datarecvlog = logging.getLogger('data.recv')
46 6a0b1658 Giorgos Verigakis
47 3dabe5d2 Stavros Sachtouris
48 a1c50326 Giorgos Verigakis
class ClientError(Exception):
49 4f989909 Stavros Sachtouris
    def __init__(self, message, status=0, details=None):
50 de4f08ef Stavros Sachtouris
        try:
51 1f417830 Stavros Sachtouris
            message += '' if message and message[-1] == '\n' else '\n'
52 062b1d0a Stavros Sachtouris
            serv_stat, sep, new_msg = message.partition('{')
53 062b1d0a Stavros Sachtouris
            new_msg = sep + new_msg
54 062b1d0a Stavros Sachtouris
            json_msg = loads(new_msg)
55 de4f08ef Stavros Sachtouris
            key = json_msg.keys()[0]
56 062b1d0a Stavros Sachtouris
57 de4f08ef Stavros Sachtouris
            json_msg = json_msg[key]
58 062b1d0a Stavros Sachtouris
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
59 062b1d0a Stavros Sachtouris
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
60 de4f08ef Stavros Sachtouris
            if 'code' in json_msg:
61 de4f08ef Stavros Sachtouris
                status = json_msg['code']
62 de4f08ef Stavros Sachtouris
            if 'details' in json_msg:
63 de4f08ef Stavros Sachtouris
                if not details:
64 de4f08ef Stavros Sachtouris
                    details = []
65 de4f08ef Stavros Sachtouris
                elif not isinstance(details, list):
66 de4f08ef Stavros Sachtouris
                    details = [details]
67 de4f08ef Stavros Sachtouris
                if json_msg['details']:
68 de4f08ef Stavros Sachtouris
                    details.append(json_msg['details'])
69 de4f08ef Stavros Sachtouris
        except:
70 de4f08ef Stavros Sachtouris
            pass
71 de4f08ef Stavros Sachtouris
72 7e5415a4 Stavros Sachtouris
        super(ClientError, self).__init__(message)
73 a1c50326 Giorgos Verigakis
        self.status = status
74 4f989909 Stavros Sachtouris
        self.details = details if details else []
75 a1c50326 Giorgos Verigakis
76 3dabe5d2 Stavros Sachtouris
77 2b74ab4a Stavros Sachtouris
class SilentEvent(Thread):
78 2b74ab4a Stavros Sachtouris
    """ Thread-run method(*args, **kwargs)
79 2b74ab4a Stavros Sachtouris
        put exception in exception_bucket
80 2b74ab4a Stavros Sachtouris
    """
81 2b74ab4a Stavros Sachtouris
    def __init__(self, method, *args, **kwargs):
82 2b74ab4a Stavros Sachtouris
        super(self.__class__, self).__init__()
83 2b74ab4a Stavros Sachtouris
        self.method = method
84 2b74ab4a Stavros Sachtouris
        self.args = args
85 2b74ab4a Stavros Sachtouris
        self.kwargs = kwargs
86 2b74ab4a Stavros Sachtouris
87 2b74ab4a Stavros Sachtouris
    @property
88 2b74ab4a Stavros Sachtouris
    def exception(self):
89 2b74ab4a Stavros Sachtouris
        return getattr(self, '_exception', False)
90 2b74ab4a Stavros Sachtouris
91 2b74ab4a Stavros Sachtouris
    @property
92 2b74ab4a Stavros Sachtouris
    def value(self):
93 2b74ab4a Stavros Sachtouris
        return getattr(self, '_value', None)
94 2b74ab4a Stavros Sachtouris
95 2b74ab4a Stavros Sachtouris
    def run(self):
96 2b74ab4a Stavros Sachtouris
        try:
97 2b74ab4a Stavros Sachtouris
            self._value = self.method(*(self.args), **(self.kwargs))
98 2b74ab4a Stavros Sachtouris
        except Exception as e:
99 7644c38e Stavros Sachtouris
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
100 7644c38e Stavros Sachtouris
                self,
101 7644c38e Stavros Sachtouris
                type(e),
102 7644c38e Stavros Sachtouris
                e.status if isinstance(e, ClientError) else '',
103 7644c38e Stavros Sachtouris
                e))
104 2b74ab4a Stavros Sachtouris
            self._exception = e
105 2b74ab4a Stavros Sachtouris
106 6069b53b Stavros Sachtouris
107 6a0b1658 Giorgos Verigakis
class Client(object):
108 cccff590 Stavros Sachtouris
    POOL_SIZE = 7
109 cad39033 Stavros Sachtouris
110 5b263ba2 Stavros Sachtouris
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
111 6a0b1658 Giorgos Verigakis
        self.base_url = base_url
112 6c62a96d Giorgos Verigakis
        self.token = token
113 e742badc Stavros Sachtouris
        self.headers = {}
114 16ce7b91 Stavros Sachtouris
        self.DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
115 16ce7b91 Stavros Sachtouris
            "%A, %d-%b-%y %H:%M:%S GMT",
116 16ce7b91 Stavros Sachtouris
            "%a, %d %b %Y %H:%M:%S GMT"]
117 a2e8e549 Stavros Sachtouris
        self.http_client = http_client
118 6c62a96d Giorgos Verigakis
119 cad39033 Stavros Sachtouris
    def _init_thread_limit(self, limit=1):
120 cad39033 Stavros Sachtouris
        self._thread_limit = limit
121 cad39033 Stavros Sachtouris
        self._elapsed_old = 0.0
122 cad39033 Stavros Sachtouris
        self._elapsed_new = 0.0
123 cad39033 Stavros Sachtouris
124 cad39033 Stavros Sachtouris
    def _watch_thread_limit(self, threadlist):
125 16c895db Stavros Sachtouris
        recvlog.debug('# running threads: %s' % len(threadlist))
126 cad39033 Stavros Sachtouris
        if self._elapsed_old > self._elapsed_new\
127 cad39033 Stavros Sachtouris
        and self._thread_limit < self.POOL_SIZE:
128 cad39033 Stavros Sachtouris
            self._thread_limit += 1
129 cad39033 Stavros Sachtouris
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
130 cad39033 Stavros Sachtouris
            self._thread_limit -= 1
131 cad39033 Stavros Sachtouris
132 cad39033 Stavros Sachtouris
        self._elapsed_old = self._elapsed_new
133 cad39033 Stavros Sachtouris
        if len(threadlist) >= self._thread_limit:
134 cad39033 Stavros Sachtouris
            self._elapsed_new = 0.0
135 cad39033 Stavros Sachtouris
            for thread in threadlist:
136 cad39033 Stavros Sachtouris
                begin_time = time()
137 cad39033 Stavros Sachtouris
                thread.join()
138 cad39033 Stavros Sachtouris
                self._elapsed_new += time() - begin_time
139 cad39033 Stavros Sachtouris
            self._elapsed_new = self._elapsed_new / len(threadlist)
140 cad39033 Stavros Sachtouris
            return []
141 cad39033 Stavros Sachtouris
        return threadlist
142 cad39033 Stavros Sachtouris
143 6ad245d5 Stavros Sachtouris
    def _raise_for_status(self, r):
144 7966ffb8 Stavros Sachtouris
        status_msg = getattr(r, 'status', '')
145 d804de82 Stavros Sachtouris
        try:
146 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r.text)
147 d804de82 Stavros Sachtouris
        except:
148 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r)
149 7966ffb8 Stavros Sachtouris
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
150 7966ffb8 Stavros Sachtouris
        raise ClientError(message, status=status)
151 6a0b1658 Giorgos Verigakis
152 4adfa919 Stavros Sachtouris
    def set_header(self, name, value, iff=True):
153 3dabe5d2 Stavros Sachtouris
        """Set a header 'name':'value'"""
154 4adfa919 Stavros Sachtouris
        if value is not None and iff:
155 2f749e6e Stavros Sachtouris
            self.http_client.set_header(name, value)
156 2f749e6e Stavros Sachtouris
157 2f749e6e Stavros Sachtouris
    def set_param(self, name, value=None, iff=True):
158 2f749e6e Stavros Sachtouris
        if iff:
159 2f749e6e Stavros Sachtouris
            self.http_client.set_param(name, value)
160 2f749e6e Stavros Sachtouris
161 2f749e6e Stavros Sachtouris
    def set_default_header(self, name, value):
162 2f749e6e Stavros Sachtouris
        self.http_client.headers.setdefault(name, value)
163 e742badc Stavros Sachtouris
164 6ce9fc72 Stavros Sachtouris
    def request(self,
165 6ce9fc72 Stavros Sachtouris
        method,
166 6ce9fc72 Stavros Sachtouris
        path,
167 6ce9fc72 Stavros Sachtouris
        async_headers={},
168 6ce9fc72 Stavros Sachtouris
        async_params={},
169 6ce9fc72 Stavros Sachtouris
        **kwargs):
170 9a7efb0d Stavros Sachtouris
        """In threaded/asynchronous requests, headers and params are not safe
171 3dabe5d2 Stavros Sachtouris
        Therefore, the standard self.set_header/param system can be used only
172 3dabe5d2 Stavros Sachtouris
        for headers and params that are common for all requests. All other
173 3dabe5d2 Stavros Sachtouris
        params and headers should passes as
174 9a7efb0d Stavros Sachtouris
        @param async_headers
175 9a7efb0d Stavros Sachtouris
        @async_params
176 3dabe5d2 Stavros Sachtouris
        E.g. in most queries the 'X-Auth-Token' header might be the same for
177 3dabe5d2 Stavros Sachtouris
        all, but the 'Range' header might be different from request to request.
178 9a7efb0d Stavros Sachtouris
        """
179 1785ad41 Stavros Sachtouris
180 d726b3d0 Stavros Sachtouris
        try:
181 2f749e6e Stavros Sachtouris
            success = kwargs.pop('success', 200)
182 2f749e6e Stavros Sachtouris
183 2f749e6e Stavros Sachtouris
            data = kwargs.pop('data', None)
184 2f749e6e Stavros Sachtouris
            self.set_default_header('X-Auth-Token', self.token)
185 2f749e6e Stavros Sachtouris
186 2f749e6e Stavros Sachtouris
            if 'json' in kwargs:
187 de4f08ef Stavros Sachtouris
                data = dumps(kwargs.pop('json'))
188 2f749e6e Stavros Sachtouris
                self.set_default_header('Content-Type', 'application/json')
189 2f749e6e Stavros Sachtouris
            if data:
190 2f749e6e Stavros Sachtouris
                self.set_default_header('Content-Length', unicode(len(data)))
191 2f749e6e Stavros Sachtouris
192 a40e152f Stavros Sachtouris
            sendlog.info('perform a %s @ %s', method, self.base_url)
193 a40e152f Stavros Sachtouris
194 7d91734c Stavros Sachtouris
            self.http_client.url = self.base_url
195 7d91734c Stavros Sachtouris
            self.http_client.path = path
196 3dabe5d2 Stavros Sachtouris
            r = self.http_client.perform_request(method,
197 3dabe5d2 Stavros Sachtouris
                data,
198 3dabe5d2 Stavros Sachtouris
                async_headers,
199 3dabe5d2 Stavros Sachtouris
                async_params)
200 2f749e6e Stavros Sachtouris
201 2f749e6e Stavros Sachtouris
            req = self.http_client
202 5b263ba2 Stavros Sachtouris
            sendlog.info('%s %s', method, req.url)
203 1785ad41 Stavros Sachtouris
            headers = dict(req.headers)
204 1785ad41 Stavros Sachtouris
            headers.update(async_headers)
205 1785ad41 Stavros Sachtouris
206 1785ad41 Stavros Sachtouris
            for key, val in headers.items():
207 1785ad41 Stavros Sachtouris
                sendlog.info('\t%s: %s', key, val)
208 2f749e6e Stavros Sachtouris
            sendlog.info('')
209 2f749e6e Stavros Sachtouris
            if data:
210 1a3c18fd Stavros Sachtouris
                datasendlog.info(data)
211 2f749e6e Stavros Sachtouris
212 2f749e6e Stavros Sachtouris
            recvlog.info('%d %s', r.status_code, r.status)
213 2f749e6e Stavros Sachtouris
            for key, val in r.headers.items():
214 2f749e6e Stavros Sachtouris
                recvlog.info('%s: %s', key, val)
215 bcb51856 Stavros Sachtouris
            if r.content:
216 1a3c18fd Stavros Sachtouris
                datarecvlog.info(r.content)
217 2f749e6e Stavros Sachtouris
218 1f417830 Stavros Sachtouris
        except (HTTPResponseError, HTTPConnectionError) as err:
219 1f417830 Stavros Sachtouris
            from traceback import format_stack
220 1f417830 Stavros Sachtouris
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
221 2f749e6e Stavros Sachtouris
            self.http_client.reset_headers()
222 2f749e6e Stavros Sachtouris
            self.http_client.reset_params()
223 1f417830 Stavros Sachtouris
            errstr = '%s' % err
224 1f417830 Stavros Sachtouris
            if not errstr:
225 1f417830 Stavros Sachtouris
                errstr = ('%s' % type(err))[7:-2]
226 1f417830 Stavros Sachtouris
            raise ClientError('%s\n' % errstr,
227 67469d65 Stavros Sachtouris
                status=getattr(err, 'status', 0) or getattr(err, 'errno', 0))
228 a52d2256 Stavros Sachtouris
229 a52d2256 Stavros Sachtouris
        self.http_client.reset_headers()
230 a52d2256 Stavros Sachtouris
        self.http_client.reset_params()
231 0238c167 Stavros Sachtouris
232 0238c167 Stavros Sachtouris
        if success is not None:
233 0238c167 Stavros Sachtouris
            # Success can either be an in or a collection
234 0238c167 Stavros Sachtouris
            success = (success,) if isinstance(success, int) else success
235 0238c167 Stavros Sachtouris
            if r.status_code not in success:
236 0238c167 Stavros Sachtouris
                r.release()
237 0238c167 Stavros Sachtouris
                self._raise_for_status(r)
238 d726b3d0 Stavros Sachtouris
        return r
239 a1c50326 Giorgos Verigakis
240 6a0b1658 Giorgos Verigakis
    def delete(self, path, **kwargs):
241 6a0b1658 Giorgos Verigakis
        return self.request('delete', path, **kwargs)
242 6a0b1658 Giorgos Verigakis
243 6a0b1658 Giorgos Verigakis
    def get(self, path, **kwargs):
244 6a0b1658 Giorgos Verigakis
        return self.request('get', path, **kwargs)
245 6a0b1658 Giorgos Verigakis
246 6a0b1658 Giorgos Verigakis
    def head(self, path, **kwargs):
247 6a0b1658 Giorgos Verigakis
        return self.request('head', path, **kwargs)
248 6a0b1658 Giorgos Verigakis
249 6a0b1658 Giorgos Verigakis
    def post(self, path, **kwargs):
250 6a0b1658 Giorgos Verigakis
        return self.request('post', path, **kwargs)
251 6a0b1658 Giorgos Verigakis
252 6a0b1658 Giorgos Verigakis
    def put(self, path, **kwargs):
253 6a0b1658 Giorgos Verigakis
        return self.request('put', path, **kwargs)
254 6a0b1658 Giorgos Verigakis
255 4adfa919 Stavros Sachtouris
    def copy(self, path, **kwargs):
256 4adfa919 Stavros Sachtouris
        return self.request('copy', path, **kwargs)
257 4adfa919 Stavros Sachtouris
258 4adfa919 Stavros Sachtouris
    def move(self, path, **kwargs):
259 4adfa919 Stavros Sachtouris
        return self.request('move', path, **kwargs)