Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ dd7461ac

History | View | Annotate | Download (9.4 kB)

1 43ca98ee Giorgos Verigakis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 a1c50326 Giorgos Verigakis
#
3 a1c50326 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 a1c50326 Giorgos Verigakis
# without modification, are permitted provided that the following
5 a1c50326 Giorgos Verigakis
# conditions are met:
6 a1c50326 Giorgos Verigakis
#
7 a1c50326 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
9 a1c50326 Giorgos Verigakis
#      disclaimer.
10 a1c50326 Giorgos Verigakis
#
11 a1c50326 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
13 a1c50326 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 a1c50326 Giorgos Verigakis
#      provided with the distribution.
15 a1c50326 Giorgos Verigakis
#
16 a1c50326 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 a1c50326 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 a1c50326 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 a1c50326 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 a1c50326 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 a1c50326 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 a1c50326 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 a1c50326 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 a1c50326 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 a1c50326 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 a1c50326 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 a1c50326 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 a1c50326 Giorgos Verigakis
#
29 a1c50326 Giorgos Verigakis
# The views and conclusions contained in the software and
30 a1c50326 Giorgos Verigakis
# documentation are those of the authors and should not be
31 a1c50326 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 a1c50326 Giorgos Verigakis
# or implied, of GRNET S.A.
33 a1c50326 Giorgos Verigakis
34 fce31e83 Stavros Sachtouris
from urllib2 import quote
35 2b74ab4a Stavros Sachtouris
from threading import Thread
36 de4f08ef Stavros Sachtouris
from json import dumps, loads
37 cad39033 Stavros Sachtouris
from time import time
38 6a0b1658 Giorgos Verigakis
import logging
39 c270fe96 Stavros Sachtouris
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
40 1f417830 Stavros Sachtouris
from kamaki.clients.connection.errors import HTTPConnectionError
41 1f417830 Stavros Sachtouris
from kamaki.clients.connection.errors import HTTPResponseError
42 6a0b1658 Giorgos Verigakis
43 6a0b1658 Giorgos Verigakis
sendlog = logging.getLogger('clients.send')
44 1a3c18fd Stavros Sachtouris
datasendlog = logging.getLogger('data.send')
45 6a0b1658 Giorgos Verigakis
recvlog = logging.getLogger('clients.recv')
46 1a3c18fd Stavros Sachtouris
datarecvlog = logging.getLogger('data.recv')
47 6a0b1658 Giorgos Verigakis
48 3dabe5d2 Stavros Sachtouris
49 a1c50326 Giorgos Verigakis
class ClientError(Exception):
50 4f989909 Stavros Sachtouris
    def __init__(self, message, status=0, details=None):
51 de4f08ef Stavros Sachtouris
        try:
52 1f417830 Stavros Sachtouris
            message += '' if message and message[-1] == '\n' else '\n'
53 062b1d0a Stavros Sachtouris
            serv_stat, sep, new_msg = message.partition('{')
54 062b1d0a Stavros Sachtouris
            new_msg = sep + new_msg
55 062b1d0a Stavros Sachtouris
            json_msg = loads(new_msg)
56 de4f08ef Stavros Sachtouris
            key = json_msg.keys()[0]
57 062b1d0a Stavros Sachtouris
58 de4f08ef Stavros Sachtouris
            json_msg = json_msg[key]
59 062b1d0a Stavros Sachtouris
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
60 062b1d0a Stavros Sachtouris
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
61 de4f08ef Stavros Sachtouris
            if 'code' in json_msg:
62 de4f08ef Stavros Sachtouris
                status = json_msg['code']
63 de4f08ef Stavros Sachtouris
            if 'details' in json_msg:
64 de4f08ef Stavros Sachtouris
                if not details:
65 de4f08ef Stavros Sachtouris
                    details = []
66 de4f08ef Stavros Sachtouris
                elif not isinstance(details, list):
67 de4f08ef Stavros Sachtouris
                    details = [details]
68 de4f08ef Stavros Sachtouris
                if json_msg['details']:
69 de4f08ef Stavros Sachtouris
                    details.append(json_msg['details'])
70 de4f08ef Stavros Sachtouris
        except:
71 de4f08ef Stavros Sachtouris
            pass
72 de4f08ef Stavros Sachtouris
73 7e5415a4 Stavros Sachtouris
        super(ClientError, self).__init__(message)
74 a1c50326 Giorgos Verigakis
        self.status = status
75 4f989909 Stavros Sachtouris
        self.details = details if details else []
76 a1c50326 Giorgos Verigakis
77 3dabe5d2 Stavros Sachtouris
78 2b74ab4a Stavros Sachtouris
class SilentEvent(Thread):
79 2b74ab4a Stavros Sachtouris
    """ Thread-run method(*args, **kwargs)
80 2b74ab4a Stavros Sachtouris
        put exception in exception_bucket
81 2b74ab4a Stavros Sachtouris
    """
82 2b74ab4a Stavros Sachtouris
    def __init__(self, method, *args, **kwargs):
83 2b74ab4a Stavros Sachtouris
        super(self.__class__, self).__init__()
84 2b74ab4a Stavros Sachtouris
        self.method = method
85 2b74ab4a Stavros Sachtouris
        self.args = args
86 2b74ab4a Stavros Sachtouris
        self.kwargs = kwargs
87 2b74ab4a Stavros Sachtouris
88 2b74ab4a Stavros Sachtouris
    @property
89 2b74ab4a Stavros Sachtouris
    def exception(self):
90 2b74ab4a Stavros Sachtouris
        return getattr(self, '_exception', False)
91 2b74ab4a Stavros Sachtouris
92 2b74ab4a Stavros Sachtouris
    @property
93 2b74ab4a Stavros Sachtouris
    def value(self):
94 2b74ab4a Stavros Sachtouris
        return getattr(self, '_value', None)
95 2b74ab4a Stavros Sachtouris
96 2b74ab4a Stavros Sachtouris
    def run(self):
97 2b74ab4a Stavros Sachtouris
        try:
98 2b74ab4a Stavros Sachtouris
            self._value = self.method(*(self.args), **(self.kwargs))
99 2b74ab4a Stavros Sachtouris
        except Exception as e:
100 7644c38e Stavros Sachtouris
            recvlog.debug('Thread %s got exception %s\n<%s %s' % (
101 7644c38e Stavros Sachtouris
                self,
102 7644c38e Stavros Sachtouris
                type(e),
103 7644c38e Stavros Sachtouris
                e.status if isinstance(e, ClientError) else '',
104 7644c38e Stavros Sachtouris
                e))
105 2b74ab4a Stavros Sachtouris
            self._exception = e
106 2b74ab4a Stavros Sachtouris
107 6069b53b Stavros Sachtouris
108 6a0b1658 Giorgos Verigakis
class Client(object):
109 cccff590 Stavros Sachtouris
    POOL_SIZE = 7
110 cad39033 Stavros Sachtouris
111 5b263ba2 Stavros Sachtouris
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
112 6a0b1658 Giorgos Verigakis
        self.base_url = base_url
113 6c62a96d Giorgos Verigakis
        self.token = token
114 e742badc Stavros Sachtouris
        self.headers = {}
115 de73876b Stavros Sachtouris
        self.DATE_FORMATS = [
116 de73876b Stavros Sachtouris
            '%a %b %d %H:%M:%S %Y',
117 de73876b Stavros Sachtouris
            '%A, %d-%b-%y %H:%M:%S GMT',
118 de73876b Stavros Sachtouris
            '%a, %d %b %Y %H:%M:%S GMT']
119 a2e8e549 Stavros Sachtouris
        self.http_client = http_client
120 6c62a96d Giorgos Verigakis
121 cad39033 Stavros Sachtouris
    def _init_thread_limit(self, limit=1):
122 cad39033 Stavros Sachtouris
        self._thread_limit = limit
123 cad39033 Stavros Sachtouris
        self._elapsed_old = 0.0
124 cad39033 Stavros Sachtouris
        self._elapsed_new = 0.0
125 cad39033 Stavros Sachtouris
126 cad39033 Stavros Sachtouris
    def _watch_thread_limit(self, threadlist):
127 16c895db Stavros Sachtouris
        recvlog.debug('# running threads: %s' % len(threadlist))
128 24ff0a35 Stavros Sachtouris
        if (self._elapsed_old > self._elapsed_new) and (
129 2005b18e Stavros Sachtouris
                self._thread_limit < self.POOL_SIZE):
130 2005b18e Stavros Sachtouris
            self._thread_limit += 1
131 cad39033 Stavros Sachtouris
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
132 cad39033 Stavros Sachtouris
            self._thread_limit -= 1
133 cad39033 Stavros Sachtouris
134 cad39033 Stavros Sachtouris
        self._elapsed_old = self._elapsed_new
135 cad39033 Stavros Sachtouris
        if len(threadlist) >= self._thread_limit:
136 cad39033 Stavros Sachtouris
            self._elapsed_new = 0.0
137 cad39033 Stavros Sachtouris
            for thread in threadlist:
138 cad39033 Stavros Sachtouris
                begin_time = time()
139 cad39033 Stavros Sachtouris
                thread.join()
140 cad39033 Stavros Sachtouris
                self._elapsed_new += time() - begin_time
141 cad39033 Stavros Sachtouris
            self._elapsed_new = self._elapsed_new / len(threadlist)
142 cad39033 Stavros Sachtouris
            return []
143 cad39033 Stavros Sachtouris
        return threadlist
144 cad39033 Stavros Sachtouris
145 6ad245d5 Stavros Sachtouris
    def _raise_for_status(self, r):
146 7966ffb8 Stavros Sachtouris
        status_msg = getattr(r, 'status', '')
147 d804de82 Stavros Sachtouris
        try:
148 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r.text)
149 d804de82 Stavros Sachtouris
        except:
150 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r)
151 7966ffb8 Stavros Sachtouris
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
152 7966ffb8 Stavros Sachtouris
        raise ClientError(message, status=status)
153 6a0b1658 Giorgos Verigakis
154 4adfa919 Stavros Sachtouris
    def set_header(self, name, value, iff=True):
155 3dabe5d2 Stavros Sachtouris
        """Set a header 'name':'value'"""
156 4adfa919 Stavros Sachtouris
        if value is not None and iff:
157 2f749e6e Stavros Sachtouris
            self.http_client.set_header(name, value)
158 2f749e6e Stavros Sachtouris
159 2f749e6e Stavros Sachtouris
    def set_param(self, name, value=None, iff=True):
160 2f749e6e Stavros Sachtouris
        if iff:
161 2f749e6e Stavros Sachtouris
            self.http_client.set_param(name, value)
162 2f749e6e Stavros Sachtouris
163 2f749e6e Stavros Sachtouris
    def set_default_header(self, name, value):
164 2f749e6e Stavros Sachtouris
        self.http_client.headers.setdefault(name, value)
165 e742badc Stavros Sachtouris
166 de73876b Stavros Sachtouris
    def request(
167 24ff0a35 Stavros Sachtouris
            self,
168 24ff0a35 Stavros Sachtouris
            method,
169 24ff0a35 Stavros Sachtouris
            path,
170 24ff0a35 Stavros Sachtouris
            async_headers={},
171 24ff0a35 Stavros Sachtouris
            async_params={},
172 24ff0a35 Stavros Sachtouris
            **kwargs):
173 9a7efb0d Stavros Sachtouris
        """In threaded/asynchronous requests, headers and params are not safe
174 3dabe5d2 Stavros Sachtouris
        Therefore, the standard self.set_header/param system can be used only
175 3dabe5d2 Stavros Sachtouris
        for headers and params that are common for all requests. All other
176 3dabe5d2 Stavros Sachtouris
        params and headers should passes as
177 9a7efb0d Stavros Sachtouris
        @param async_headers
178 9a7efb0d Stavros Sachtouris
        @async_params
179 3dabe5d2 Stavros Sachtouris
        E.g. in most queries the 'X-Auth-Token' header might be the same for
180 3dabe5d2 Stavros Sachtouris
        all, but the 'Range' header might be different from request to request.
181 9a7efb0d Stavros Sachtouris
        """
182 d726b3d0 Stavros Sachtouris
        try:
183 2f749e6e Stavros Sachtouris
            success = kwargs.pop('success', 200)
184 2f749e6e Stavros Sachtouris
185 2f749e6e Stavros Sachtouris
            data = kwargs.pop('data', None)
186 2f749e6e Stavros Sachtouris
            self.set_default_header('X-Auth-Token', self.token)
187 2f749e6e Stavros Sachtouris
188 2f749e6e Stavros Sachtouris
            if 'json' in kwargs:
189 de4f08ef Stavros Sachtouris
                data = dumps(kwargs.pop('json'))
190 2f749e6e Stavros Sachtouris
                self.set_default_header('Content-Type', 'application/json')
191 2f749e6e Stavros Sachtouris
            if data:
192 a517ff50 Stavros Sachtouris
                self.set_default_header('Content-Length', '%s' % len(data))
193 2f749e6e Stavros Sachtouris
194 a40e152f Stavros Sachtouris
            sendlog.info('perform a %s @ %s', method, self.base_url)
195 a40e152f Stavros Sachtouris
196 dd7461ac Stavros Sachtouris
            self.http_client.url = self.base_url + (
197 dd7461ac Stavros Sachtouris
                '/' if (self.base_url and self.base_url[-1]) != '/' else '')
198 4de960c7 Stavros Sachtouris
            self.http_client.path = quote(path.encode('utf8'))
199 de73876b Stavros Sachtouris
            r = self.http_client.perform_request(
200 de73876b Stavros Sachtouris
                method,
201 3dabe5d2 Stavros Sachtouris
                data,
202 3dabe5d2 Stavros Sachtouris
                async_headers,
203 3dabe5d2 Stavros Sachtouris
                async_params)
204 2f749e6e Stavros Sachtouris
205 2f749e6e Stavros Sachtouris
            req = self.http_client
206 5b263ba2 Stavros Sachtouris
            sendlog.info('%s %s', method, req.url)
207 1785ad41 Stavros Sachtouris
            headers = dict(req.headers)
208 1785ad41 Stavros Sachtouris
            headers.update(async_headers)
209 1785ad41 Stavros Sachtouris
210 1785ad41 Stavros Sachtouris
            for key, val in headers.items():
211 1785ad41 Stavros Sachtouris
                sendlog.info('\t%s: %s', key, val)
212 2f749e6e Stavros Sachtouris
            sendlog.info('')
213 2f749e6e Stavros Sachtouris
            if data:
214 1a3c18fd Stavros Sachtouris
                datasendlog.info(data)
215 2f749e6e Stavros Sachtouris
216 2f749e6e Stavros Sachtouris
            recvlog.info('%d %s', r.status_code, r.status)
217 2f749e6e Stavros Sachtouris
            for key, val in r.headers.items():
218 2f749e6e Stavros Sachtouris
                recvlog.info('%s: %s', key, val)
219 bcb51856 Stavros Sachtouris
            if r.content:
220 1a3c18fd Stavros Sachtouris
                datarecvlog.info(r.content)
221 2f749e6e Stavros Sachtouris
222 1f417830 Stavros Sachtouris
        except (HTTPResponseError, HTTPConnectionError) as err:
223 1f417830 Stavros Sachtouris
            from traceback import format_stack
224 1f417830 Stavros Sachtouris
            recvlog.debug('\n'.join(['%s' % type(err)] + format_stack()))
225 2f749e6e Stavros Sachtouris
            self.http_client.reset_headers()
226 2f749e6e Stavros Sachtouris
            self.http_client.reset_params()
227 1f417830 Stavros Sachtouris
            errstr = '%s' % err
228 1f417830 Stavros Sachtouris
            if not errstr:
229 1f417830 Stavros Sachtouris
                errstr = ('%s' % type(err))[7:-2]
230 de73876b Stavros Sachtouris
            status = getattr(err, 'status', getattr(err, 'errno', 0))
231 de73876b Stavros Sachtouris
            raise ClientError('%s\n' % errstr, status=status)
232 a52d2256 Stavros Sachtouris
233 a52d2256 Stavros Sachtouris
        self.http_client.reset_headers()
234 a52d2256 Stavros Sachtouris
        self.http_client.reset_params()
235 0238c167 Stavros Sachtouris
236 0238c167 Stavros Sachtouris
        if success is not None:
237 0238c167 Stavros Sachtouris
            # Success can either be an in or a collection
238 0238c167 Stavros Sachtouris
            success = (success,) if isinstance(success, int) else success
239 0238c167 Stavros Sachtouris
            if r.status_code not in success:
240 0238c167 Stavros Sachtouris
                r.release()
241 0238c167 Stavros Sachtouris
                self._raise_for_status(r)
242 d726b3d0 Stavros Sachtouris
        return r
243 a1c50326 Giorgos Verigakis
244 6a0b1658 Giorgos Verigakis
    def delete(self, path, **kwargs):
245 6a0b1658 Giorgos Verigakis
        return self.request('delete', path, **kwargs)
246 6a0b1658 Giorgos Verigakis
247 6a0b1658 Giorgos Verigakis
    def get(self, path, **kwargs):
248 6a0b1658 Giorgos Verigakis
        return self.request('get', path, **kwargs)
249 6a0b1658 Giorgos Verigakis
250 6a0b1658 Giorgos Verigakis
    def head(self, path, **kwargs):
251 6a0b1658 Giorgos Verigakis
        return self.request('head', path, **kwargs)
252 6a0b1658 Giorgos Verigakis
253 6a0b1658 Giorgos Verigakis
    def post(self, path, **kwargs):
254 6a0b1658 Giorgos Verigakis
        return self.request('post', path, **kwargs)
255 6a0b1658 Giorgos Verigakis
256 6a0b1658 Giorgos Verigakis
    def put(self, path, **kwargs):
257 6a0b1658 Giorgos Verigakis
        return self.request('put', path, **kwargs)
258 6a0b1658 Giorgos Verigakis
259 4adfa919 Stavros Sachtouris
    def copy(self, path, **kwargs):
260 4adfa919 Stavros Sachtouris
        return self.request('copy', path, **kwargs)
261 4adfa919 Stavros Sachtouris
262 4adfa919 Stavros Sachtouris
    def move(self, path, **kwargs):
263 4adfa919 Stavros Sachtouris
        return self.request('move', path, **kwargs)