Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / __init__.py @ cccff590

History | View | Annotate | Download (7.7 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, self.list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, self.list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from json import dumps, loads
35
from time import time
36
import logging
37
from kamaki.clients.connection.kamakicon import KamakiHTTPConnection
38

    
39
sendlog = logging.getLogger('clients.send')
40
recvlog = logging.getLogger('clients.recv')
41

    
42

    
43
class ClientError(Exception):
44
    def __init__(self, message, status=0, details=[]):
45
        try:
46
            serv_stat, sep, new_msg = message.partition('{')
47
            new_msg = sep + new_msg
48
            json_msg = loads(new_msg)
49
            key = json_msg.keys()[0]
50

    
51
            json_msg = json_msg[key]
52
            message = '%s %s (%s)\n' % (serv_stat, key, json_msg['message'])\
53
                if 'message' in json_msg else '%s %s' % (serv_stat, key)
54
            if 'code' in json_msg:
55
                status = json_msg['code']
56
            if 'details' in json_msg:
57
                if not details:
58
                    details = []
59
                elif not isinstance(details, list):
60
                    details = [details]
61
                if json_msg['details']:
62
                    details.append(json_msg['details'])
63
        except:
64
            pass
65

    
66
        super(ClientError, self).__init__(message)
67
        self.status = status
68
        self.details = details
69

    
70

    
71
class Client(object):
72
    POOL_SIZE = 7
73

    
74
    def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
75
        self.base_url = base_url
76
        self.token = token
77
        self.headers = {}
78
        self.DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
79
            "%A, %d-%b-%y %H:%M:%S GMT",
80
            "%a, %d %b %Y %H:%M:%S GMT"]
81
        self.http_client = http_client
82

    
83
    def _init_thread_limit(self, limit=1):
84
        self._thread_limit = limit
85
        self._elapsed_old = 0.0
86
        self._elapsed_new = 0.0
87

    
88
    def _watch_thread_limit(self, threadlist):
89
        if self._elapsed_old > self._elapsed_new\
90
        and self._thread_limit < self.POOL_SIZE:
91
            self._thread_limit += 1
92
        elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
93
            self._thread_limit -= 1
94

    
95
        self._elapsed_old = self._elapsed_new
96
        if len(threadlist) >= self._thread_limit:
97
            self._elapsed_new = 0.0
98
            for thread in threadlist:
99
                begin_time = time()
100
                thread.join()
101
                self._elapsed_new += time() - begin_time
102
            self._elapsed_new = self._elapsed_new / len(threadlist)
103
            return []
104
        return threadlist
105

    
106
    def _raise_for_status(self, r):
107
        status_msg = getattr(r, 'status', '')
108
        try:
109
            message = '%s %s\n' % (status_msg, r.text)
110
        except:
111
            message = '%s %s\n' % (status_msg, r)
112
        status = getattr(r, 'status_code', getattr(r, 'status', 0))
113
        raise ClientError(message, status=status)
114

    
115
    def set_header(self, name, value, iff=True):
116
        """Set a header 'name':'value'"""
117
        if value is not None and iff:
118
            self.http_client.set_header(name, value)
119

    
120
    def set_param(self, name, value=None, iff=True):
121
        if iff:
122
            self.http_client.set_param(name, value)
123

    
124
    def set_default_header(self, name, value):
125
        self.http_client.headers.setdefault(name, value)
126

    
127
    def request(self,
128
        method,
129
        path,
130
        async_headers={},
131
        async_params={},
132
        **kwargs):
133
        """In threaded/asynchronous requests, headers and params are not safe
134
        Therefore, the standard self.set_header/param system can be used only
135
        for headers and params that are common for all requests. All other
136
        params and headers should passes as
137
        @param async_headers
138
        @async_params
139
        E.g. in most queries the 'X-Auth-Token' header might be the same for
140
        all, but the 'Range' header might be different from request to request.
141
        """
142

    
143
        try:
144
            success = kwargs.pop('success', 200)
145

    
146
            data = kwargs.pop('data', None)
147
            self.set_default_header('X-Auth-Token', self.token)
148

    
149
            if 'json' in kwargs:
150
                data = dumps(kwargs.pop('json'))
151
                self.set_default_header('Content-Type', 'application/json')
152
            if data:
153
                self.set_default_header('Content-Length', unicode(len(data)))
154

    
155
            self.http_client.url = self.base_url
156
            self.http_client.path = path
157
            r = self.http_client.perform_request(method,
158
                data,
159
                async_headers,
160
                async_params)
161

    
162
            req = self.http_client
163
            sendlog.info('%s %s', method, req.url)
164
            headers = dict(req.headers)
165
            headers.update(async_headers)
166

    
167
            for key, val in headers.items():
168
                sendlog.info('\t%s: %s', key, val)
169
            sendlog.info('')
170
            if data:
171
                sendlog.info('%s', data)
172

    
173
            recvlog.info('%d %s', r.status_code, r.status)
174
            for key, val in r.headers.items():
175
                recvlog.info('%s: %s', key, val)
176
            if r.content:
177
                recvlog.debug(r.content)
178

    
179
        except Exception as err:
180
            from traceback import print_stack
181
            recvlog.debug(print_stack)
182
            self.http_client.reset_headers()
183
            self.http_client.reset_params()
184
            raise ClientError('%s' % err, status=getattr(err, 'status', 0))
185

    
186
        self.http_client.reset_headers()
187
        self.http_client.reset_params()
188

    
189
        if success is not None:
190
            # Success can either be an in or a collection
191
            success = (success,) if isinstance(success, int) else success
192
            if r.status_code not in success:
193
                r.release()
194
                self._raise_for_status(r)
195
        return r
196

    
197
    def delete(self, path, **kwargs):
198
        return self.request('delete', path, **kwargs)
199

    
200
    def get(self, path, **kwargs):
201
        return self.request('get', path, **kwargs)
202

    
203
    def head(self, path, **kwargs):
204
        return self.request('head', path, **kwargs)
205

    
206
    def post(self, path, **kwargs):
207
        return self.request('post', path, **kwargs)
208

    
209
    def put(self, path, **kwargs):
210
        return self.request('put', path, **kwargs)
211

    
212
    def copy(self, path, **kwargs):
213
        return self.request('copy', path, **kwargs)
214

    
215
    def move(self, path, **kwargs):
216
        return self.request('move', path, **kwargs)