Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ 2b74ab4a

History | View | Annotate | Download (8.4 kB)

1 43ca98ee Giorgos Verigakis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 a1c50326 Giorgos Verigakis
#
3 a1c50326 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 a1c50326 Giorgos Verigakis
# without modification, are permitted provided that the following
5 a1c50326 Giorgos Verigakis
# conditions are met:
6 a1c50326 Giorgos Verigakis
#
7 a1c50326 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
9 a1c50326 Giorgos Verigakis
#      disclaimer.
10 a1c50326 Giorgos Verigakis
#
11 a1c50326 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 e742badc Stavros Sachtouris
#      copyright notice, self.list of conditions and the following
13 a1c50326 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 a1c50326 Giorgos Verigakis
#      provided with the distribution.
15 a1c50326 Giorgos Verigakis
#
16 a1c50326 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 a1c50326 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 a1c50326 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 a1c50326 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 a1c50326 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 a1c50326 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 a1c50326 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 a1c50326 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 a1c50326 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 a1c50326 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 a1c50326 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 a1c50326 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 a1c50326 Giorgos Verigakis
#
29 a1c50326 Giorgos Verigakis
# The views and conclusions contained in the software and
30 a1c50326 Giorgos Verigakis
# documentation are those of the authors and should not be
31 a1c50326 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 a1c50326 Giorgos Verigakis
# or implied, of GRNET S.A.
33 a1c50326 Giorgos Verigakis
34 2b74ab4a Stavros Sachtouris
from threading import Thread
35 de4f08ef Stavros Sachtouris
from json import dumps, loads
36 cad39033 Stavros Sachtouris
from time import time
37 6a0b1658 Giorgos Verigakis
import logging
38 c270fe96 Stavros Sachtouris
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
39 6a0b1658 Giorgos Verigakis
40 6a0b1658 Giorgos Verigakis
sendlog = logging.getLogger('clients.send')
41 6a0b1658 Giorgos Verigakis
recvlog = logging.getLogger('clients.recv')
42 6a0b1658 Giorgos Verigakis
43 3dabe5d2 Stavros Sachtouris
44 a1c50326 Giorgos Verigakis
class ClientError(Exception):
45 0238c167 Stavros Sachtouris
    def __init__(self, message, status=0, details=[]):
46 de4f08ef Stavros Sachtouris
        try:
47 062b1d0a Stavros Sachtouris
            serv_stat, sep, new_msg = message.partition('{')
48 062b1d0a Stavros Sachtouris
            new_msg = sep + new_msg
49 062b1d0a Stavros Sachtouris
            json_msg = loads(new_msg)
50 de4f08ef Stavros Sachtouris
            key = json_msg.keys()[0]
51 062b1d0a Stavros Sachtouris
52 de4f08ef Stavros Sachtouris
            json_msg = json_msg[key]
53 062b1d0a Stavros Sachtouris
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
54 062b1d0a Stavros Sachtouris
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
55 de4f08ef Stavros Sachtouris
            if 'code' in json_msg:
56 de4f08ef Stavros Sachtouris
                status = json_msg['code']
57 de4f08ef Stavros Sachtouris
            if 'details' in json_msg:
58 de4f08ef Stavros Sachtouris
                if not details:
59 de4f08ef Stavros Sachtouris
                    details = []
60 de4f08ef Stavros Sachtouris
                elif not isinstance(details, list):
61 de4f08ef Stavros Sachtouris
                    details = [details]
62 de4f08ef Stavros Sachtouris
                if json_msg['details']:
63 de4f08ef Stavros Sachtouris
                    details.append(json_msg['details'])
64 de4f08ef Stavros Sachtouris
        except:
65 de4f08ef Stavros Sachtouris
            pass
66 de4f08ef Stavros Sachtouris
67 7e5415a4 Stavros Sachtouris
        super(ClientError, self).__init__(message)
68 a1c50326 Giorgos Verigakis
        self.status = status
69 a1c50326 Giorgos Verigakis
        self.details = details
70 a1c50326 Giorgos Verigakis
71 3dabe5d2 Stavros Sachtouris
72 2b74ab4a Stavros Sachtouris
class SilentEvent(Thread):
73 2b74ab4a Stavros Sachtouris
    """ Thread-run method(*args, **kwargs)
74 2b74ab4a Stavros Sachtouris
        put exception in exception_bucket
75 2b74ab4a Stavros Sachtouris
    """
76 2b74ab4a Stavros Sachtouris
    def __init__(self, method, *args, **kwargs):
77 2b74ab4a Stavros Sachtouris
        super(self.__class__, self).__init__()
78 2b74ab4a Stavros Sachtouris
        self.method = method
79 2b74ab4a Stavros Sachtouris
        self.args = args
80 2b74ab4a Stavros Sachtouris
        self.kwargs = kwargs
81 2b74ab4a Stavros Sachtouris
82 2b74ab4a Stavros Sachtouris
    @property
83 2b74ab4a Stavros Sachtouris
    def exception(self):
84 2b74ab4a Stavros Sachtouris
        return getattr(self, '_exception', False)
85 2b74ab4a Stavros Sachtouris
86 2b74ab4a Stavros Sachtouris
    @property
87 2b74ab4a Stavros Sachtouris
    def value(self):
88 2b74ab4a Stavros Sachtouris
        return getattr(self, '_value', None)
89 2b74ab4a Stavros Sachtouris
90 2b74ab4a Stavros Sachtouris
    def run(self):
91 2b74ab4a Stavros Sachtouris
        try:
92 2b74ab4a Stavros Sachtouris
            self._value = self.method(*(self.args), **(self.kwargs))
93 2b74ab4a Stavros Sachtouris
        except Exception as e:
94 2b74ab4a Stavros Sachtouris
            print('______\n%s\n_______' % e)
95 2b74ab4a Stavros Sachtouris
            self._exception = e
96 2b74ab4a Stavros Sachtouris
97 6a0b1658 Giorgos Verigakis
class Client(object):
98 cccff590 Stavros Sachtouris
    POOL_SIZE = 7
99 cad39033 Stavros Sachtouris
100 5b263ba2 Stavros Sachtouris
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
101 6a0b1658 Giorgos Verigakis
        self.base_url = base_url
102 6c62a96d Giorgos Verigakis
        self.token = token
103 e742badc Stavros Sachtouris
        self.headers = {}
104 16ce7b91 Stavros Sachtouris
        self.DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
105 16ce7b91 Stavros Sachtouris
            "%A, %d-%b-%y %H:%M:%S GMT",
106 16ce7b91 Stavros Sachtouris
            "%a, %d %b %Y %H:%M:%S GMT"]
107 a2e8e549 Stavros Sachtouris
        self.http_client = http_client
108 6c62a96d Giorgos Verigakis
109 cad39033 Stavros Sachtouris
    def _init_thread_limit(self, limit=1):
110 cad39033 Stavros Sachtouris
        self._thread_limit = limit
111 cad39033 Stavros Sachtouris
        self._elapsed_old = 0.0
112 cad39033 Stavros Sachtouris
        self._elapsed_new = 0.0
113 cad39033 Stavros Sachtouris
114 cad39033 Stavros Sachtouris
    def _watch_thread_limit(self, threadlist):
115 cad39033 Stavros Sachtouris
        if self._elapsed_old > self._elapsed_new\
116 cad39033 Stavros Sachtouris
        and self._thread_limit < self.POOL_SIZE:
117 cad39033 Stavros Sachtouris
            self._thread_limit += 1
118 cad39033 Stavros Sachtouris
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
119 cad39033 Stavros Sachtouris
            self._thread_limit -= 1
120 cad39033 Stavros Sachtouris
121 cad39033 Stavros Sachtouris
        self._elapsed_old = self._elapsed_new
122 cad39033 Stavros Sachtouris
        if len(threadlist) >= self._thread_limit:
123 cad39033 Stavros Sachtouris
            self._elapsed_new = 0.0
124 cad39033 Stavros Sachtouris
            for thread in threadlist:
125 cad39033 Stavros Sachtouris
                begin_time = time()
126 cad39033 Stavros Sachtouris
                thread.join()
127 cad39033 Stavros Sachtouris
                self._elapsed_new += time() - begin_time
128 cad39033 Stavros Sachtouris
            self._elapsed_new = self._elapsed_new / len(threadlist)
129 cad39033 Stavros Sachtouris
            return []
130 cad39033 Stavros Sachtouris
        return threadlist
131 cad39033 Stavros Sachtouris
132 6ad245d5 Stavros Sachtouris
    def _raise_for_status(self, r):
133 7966ffb8 Stavros Sachtouris
        status_msg = getattr(r, 'status', '')
134 d804de82 Stavros Sachtouris
        try:
135 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r.text)
136 d804de82 Stavros Sachtouris
        except:
137 7966ffb8 Stavros Sachtouris
            message = '%s %s\n' % (status_msg, r)
138 7966ffb8 Stavros Sachtouris
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
139 7966ffb8 Stavros Sachtouris
        raise ClientError(message, status=status)
140 6a0b1658 Giorgos Verigakis
141 4adfa919 Stavros Sachtouris
    def set_header(self, name, value, iff=True):
142 3dabe5d2 Stavros Sachtouris
        """Set a header 'name':'value'"""
143 4adfa919 Stavros Sachtouris
        if value is not None and iff:
144 2f749e6e Stavros Sachtouris
            self.http_client.set_header(name, value)
145 2f749e6e Stavros Sachtouris
146 2f749e6e Stavros Sachtouris
    def set_param(self, name, value=None, iff=True):
147 2f749e6e Stavros Sachtouris
        if iff:
148 2f749e6e Stavros Sachtouris
            self.http_client.set_param(name, value)
149 2f749e6e Stavros Sachtouris
150 2f749e6e Stavros Sachtouris
    def set_default_header(self, name, value):
151 2f749e6e Stavros Sachtouris
        self.http_client.headers.setdefault(name, value)
152 e742badc Stavros Sachtouris
153 6ce9fc72 Stavros Sachtouris
    def request(self,
154 6ce9fc72 Stavros Sachtouris
        method,
155 6ce9fc72 Stavros Sachtouris
        path,
156 6ce9fc72 Stavros Sachtouris
        async_headers={},
157 6ce9fc72 Stavros Sachtouris
        async_params={},
158 6ce9fc72 Stavros Sachtouris
        **kwargs):
159 9a7efb0d Stavros Sachtouris
        """In threaded/asynchronous requests, headers and params are not safe
160 3dabe5d2 Stavros Sachtouris
        Therefore, the standard self.set_header/param system can be used only
161 3dabe5d2 Stavros Sachtouris
        for headers and params that are common for all requests. All other
162 3dabe5d2 Stavros Sachtouris
        params and headers should passes as
163 9a7efb0d Stavros Sachtouris
        @param async_headers
164 9a7efb0d Stavros Sachtouris
        @async_params
165 3dabe5d2 Stavros Sachtouris
        E.g. in most queries the 'X-Auth-Token' header might be the same for
166 3dabe5d2 Stavros Sachtouris
        all, but the 'Range' header might be different from request to request.
167 9a7efb0d Stavros Sachtouris
        """
168 1785ad41 Stavros Sachtouris
169 d726b3d0 Stavros Sachtouris
        try:
170 2f749e6e Stavros Sachtouris
            success = kwargs.pop('success', 200)
171 2f749e6e Stavros Sachtouris
172 2f749e6e Stavros Sachtouris
            data = kwargs.pop('data', None)
173 2f749e6e Stavros Sachtouris
            self.set_default_header('X-Auth-Token', self.token)
174 2f749e6e Stavros Sachtouris
175 2f749e6e Stavros Sachtouris
            if 'json' in kwargs:
176 de4f08ef Stavros Sachtouris
                data = dumps(kwargs.pop('json'))
177 2f749e6e Stavros Sachtouris
                self.set_default_header('Content-Type', 'application/json')
178 2f749e6e Stavros Sachtouris
            if data:
179 2f749e6e Stavros Sachtouris
                self.set_default_header('Content-Length', unicode(len(data)))
180 2f749e6e Stavros Sachtouris
181 7d91734c Stavros Sachtouris
            self.http_client.url = self.base_url
182 7d91734c Stavros Sachtouris
            self.http_client.path = path
183 3dabe5d2 Stavros Sachtouris
            r = self.http_client.perform_request(method,
184 3dabe5d2 Stavros Sachtouris
                data,
185 3dabe5d2 Stavros Sachtouris
                async_headers,
186 3dabe5d2 Stavros Sachtouris
                async_params)
187 2f749e6e Stavros Sachtouris
188 2f749e6e Stavros Sachtouris
            req = self.http_client
189 5b263ba2 Stavros Sachtouris
            sendlog.info('%s %s', method, req.url)
190 1785ad41 Stavros Sachtouris
            headers = dict(req.headers)
191 1785ad41 Stavros Sachtouris
            headers.update(async_headers)
192 1785ad41 Stavros Sachtouris
193 1785ad41 Stavros Sachtouris
            for key, val in headers.items():
194 1785ad41 Stavros Sachtouris
                sendlog.info('\t%s: %s', key, val)
195 2f749e6e Stavros Sachtouris
            sendlog.info('')
196 2f749e6e Stavros Sachtouris
            if data:
197 2f749e6e Stavros Sachtouris
                sendlog.info('%s', data)
198 2f749e6e Stavros Sachtouris
199 2f749e6e Stavros Sachtouris
            recvlog.info('%d %s', r.status_code, r.status)
200 2f749e6e Stavros Sachtouris
            for key, val in r.headers.items():
201 2f749e6e Stavros Sachtouris
                recvlog.info('%s: %s', key, val)
202 bcb51856 Stavros Sachtouris
            if r.content:
203 bcb51856 Stavros Sachtouris
                recvlog.debug(r.content)
204 2f749e6e Stavros Sachtouris
205 58821ef5 Stavros Sachtouris
        except Exception as err:
206 bcb51856 Stavros Sachtouris
            from traceback import print_stack
207 bcb51856 Stavros Sachtouris
            recvlog.debug(print_stack)
208 2f749e6e Stavros Sachtouris
            self.http_client.reset_headers()
209 2f749e6e Stavros Sachtouris
            self.http_client.reset_params()
210 7e5415a4 Stavros Sachtouris
            raise ClientError('%s' % err, status=getattr(err, 'status', 0))
211 a52d2256 Stavros Sachtouris
212 a52d2256 Stavros Sachtouris
        self.http_client.reset_headers()
213 a52d2256 Stavros Sachtouris
        self.http_client.reset_params()
214 0238c167 Stavros Sachtouris
215 0238c167 Stavros Sachtouris
        if success is not None:
216 0238c167 Stavros Sachtouris
            # Success can either be an in or a collection
217 0238c167 Stavros Sachtouris
            success = (success,) if isinstance(success, int) else success
218 0238c167 Stavros Sachtouris
            if r.status_code not in success:
219 0238c167 Stavros Sachtouris
                r.release()
220 0238c167 Stavros Sachtouris
                self._raise_for_status(r)
221 d726b3d0 Stavros Sachtouris
        return r
222 a1c50326 Giorgos Verigakis
223 6a0b1658 Giorgos Verigakis
    def delete(self, path, **kwargs):
224 6a0b1658 Giorgos Verigakis
        return self.request('delete', path, **kwargs)
225 6a0b1658 Giorgos Verigakis
226 6a0b1658 Giorgos Verigakis
    def get(self, path, **kwargs):
227 6a0b1658 Giorgos Verigakis
        return self.request('get', path, **kwargs)
228 6a0b1658 Giorgos Verigakis
229 6a0b1658 Giorgos Verigakis
    def head(self, path, **kwargs):
230 6a0b1658 Giorgos Verigakis
        return self.request('head', path, **kwargs)
231 6a0b1658 Giorgos Verigakis
232 6a0b1658 Giorgos Verigakis
    def post(self, path, **kwargs):
233 6a0b1658 Giorgos Verigakis
        return self.request('post', path, **kwargs)
234 6a0b1658 Giorgos Verigakis
235 6a0b1658 Giorgos Verigakis
    def put(self, path, **kwargs):
236 6a0b1658 Giorgos Verigakis
        return self.request('put', path, **kwargs)
237 6a0b1658 Giorgos Verigakis
238 4adfa919 Stavros Sachtouris
    def copy(self, path, **kwargs):
239 4adfa919 Stavros Sachtouris
        return self.request('copy', path, **kwargs)
240 4adfa919 Stavros Sachtouris
241 4adfa919 Stavros Sachtouris
    def move(self, path, **kwargs):
242 4adfa919 Stavros Sachtouris
        return self.request('move', path, **kwargs)