Revision 9a7efb0d

b/kamaki/clients/__init__.py
80 80
    def set_default_header(self, name, value):
81 81
        self.http_client.headers.setdefault(name, value)
82 82

  
83
    def request(self, method, path, **kwargs):
83
    def request(self, method, path, async_headers={}, async_params={}, **kwargs):
84
        """In threaded/asynchronous requests, headers and params are not safe
85
        Therefore, the standard self.set_header/param system can be used only for 
86
        headers and params that are common for all requests. All other params and
87
        headers should passes as
88
        @param async_headers
89
        @async_params
90
        E.g. in most queries the 'X-Auth-Token' header might be the same for all, but the
91
        'Range' header might be different from request to request.
92
        """
84 93
        try:
85 94
            success = kwargs.pop('success', 200)
86 95

  
87
            binary = kwargs.pop('binary', False)
88 96
            data = kwargs.pop('data', None)
89 97
            self.set_default_header('X-Auth-Token', self.token)
90 98

  
......
95 103
                self.set_default_header('Content-Length', unicode(len(data)))
96 104

  
97 105
            self.http_client.url = self.base_url + path
98
            r = self.http_client.perform_request(method=method, data=data)
106
            r = self.http_client.perform_request(method, data, async_headers, async_params)
99 107

  
100 108
            req = self.http_client
101 109
            sendlog.info('%s %s', method, req.url)
......
108 116
            recvlog.info('%d %s', r.status_code, r.status)
109 117
            for key, val in r.headers.items():
110 118
                recvlog.info('%s: %s', key, val)
111
            recvlog.info('')
112
            if r.content:
113
                recvlog.debug(r.content)
119
            #if r.content:
120
            #    recvlog.debug(r.content)
114 121

  
115 122
            if success is not None:
116 123
                # Success can either be an in or a collection
b/kamaki/clients/connection/__init__.py
170 170
    def set_method(self, method):
171 171
    	self.method = method
172 172

  
173
	def perform_request(self, method=None, url=None, params=None, headers=None, data=None):
173
	def perform_request(self, method=None, url=None, async_headers={}, async_params={}, data=None):
174 174
		"""
175 175
		@return an HTTPResponse (also in self.response of this object)
176 176
		named args offer the ability to reset a request or a part of the request
b/kamaki/clients/connection/kamakicon.py
59 59
        for k,v in r.getheaders():
60 60
            headers.update({k:v})
61 61
        self.headers = headers
62
        self.content = r.read()
62
        from copy import copy
63
        self.content = copy(r.read())
64
        #print('%s %s INSIDE[%s]'%(self.request, self, self.content[1:10]))
63 65
        self.status_code = r.status
64 66
        self.status = r.reason
65 67
        self.request.close()
......
90 92

  
91 93
class KamakiHTTPConnection(HTTPConnection):
92 94

  
93
    def _retrieve_connection_info(self):
95
    def _retrieve_connection_info(self, extra_params={}):
94 96
        """ return (scheme, netloc, url?with&params) """
95 97
        url = self.url
96
        for i,(key, val) in enumerate(self.params.items()):
98
        params = self.params
99
        for k,v in extra_params.items():
100
            params[k] = v
101
        for i,(key, val) in enumerate(params.items()):
97 102
            param_str = ('?' if i == 0 else '&') + unicode(key) 
98 103
            if val is not None:
99 104
                param_str+= '='+unicode(val)
......
103 108
        self.url = url
104 109
        return (parsed.scheme, parsed.netloc)
105 110

  
106
    def perform_request(self, method=None, data=None):
107
        (scheme, netloc) = self._retrieve_connection_info()
111
    def perform_request(self, method=None, data=None, async_headers={}, async_params={}):
112
        (scheme, netloc) = self._retrieve_connection_info(extra_params=async_params)
108 113
        #get connection from pool
109 114
        conn = get_http_connection(netloc=netloc, scheme=scheme)
115
        headers = self.headers
116
        for k,v in async_headers.items():
117
            headers[k] = v
110 118
        try:
111
            conn.request(method = method.upper(), url=self.url, headers=self.headers, body=data)
119
            conn.request(method = method.upper(), url=self.url, headers=headers, body=data)
112 120
        except:
113 121
            conn.close()
114 122
            raise
115 123
        return KamakiHTTPResponse(conn)
124

  
125

  
b/kamaki/clients/pithos.py
244 244
        file_hashmap = HashMap(blocksize, blockhash)
245 245
        file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen'))
246 246

  
247
        #filter out blocks that are already downloaded
248 247
        for i, x in enumerate(file_hashmap):
249 248
            local_hash = hexlify(x)
250 249
            if local_hash in remote_hashes:
......
263 262
                    gevent.Greenlet._report_error(self, exc_info)
264 263
                finally:
265 264
                    sys.stderr = _stderr
266
        self.POOL_SIZE = 5
265
        if not hasattr(self, 'POOL_SIZE'):
266
            self.POOL_SIZE = 5
267 267
        if self.async_pool is None:
268 268
            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
269 269
        g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
......
273 273
    def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs):
274 274
        finished = []
275 275
        for start, g in flying_greenlets.items():
276
            print('\tIs g ID(%s) ready? %s'%(self.mmaapp[start], g.ready()))
276 277
            if g.ready():
277 278
                if g.exception:
278
                    g.release()
279 279
                    raise g.exception
280 280
                try:
281 281
                    block = g.value.content
282 282
                except AttributeError:
283 283
                    broken[start] = flying_greenlets.pop(start)
284
                    #g.spawn()
284 285
                    continue
285 286
                local_file.seek(start)
287
                print('\tID(%s) [%s...]\n\tg.value:%s\n\tg:%s\n'%(self.mmaapp[start], block[1:10],
288
                    g.value, g))
289
                print('\tID(%s): g.value.request: %s\n---'%(self.mmaapp[start], g.value.request))
286 290
                local_file.write(block)
287 291
                #local_file.flush()
288 292
                self._cb_next()
......
291 295
        return finished
292 296

  
293 297
    def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs):
294

  
295
        #let the fly
296 298
        flying_greenlets = {}
297 299
        finished_greenlets = []
298 300
        broken = {}
301
        self.mmaapp = {}
299 302
        for block_hash, blockid in remote_hashes.items():
303
            if len(flying_greenlets) >= self.POOL_SIZE:
304
                finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
305
                    **restargs)
300 306
            start = blocksize*blockid
307
            self.mmaapp[start] = blockid
301 308
            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
302
            restargs['data_range'] = 'bytes=%s-%s'%(start, end)
303
            #store info for relaunching greenlet if needed
309
            restargs['async_headers'] = dict(data_range='bytes=%s-%s'%(start, end))
310
            print('ID(%s) get_grnlt {'%blockid)
304 311
            flying_greenlets[start] = self._get_block_async(obj, **restargs)
305
            finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
306
                **restargs)
312
            print('ID(%s) got_grnlt }'%blockid)
307 313

  
308 314
        #check the greenlets
309 315
        while len(flying_greenlets) > 0:
......
333 339
            hash_list, 
334 340
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
335 341
        assert total_size >= 0
342
        self.POOL_SIZE = 5
336 343

  
337 344
        if download_cb:
338 345
            self.progress_bar_gen = download_cb(len(remote_hashes)+1)
b/kamaki/clients/pithos_rest_api.py
282 282
        success = kwargs.pop('success', 200)
283 283
        return self.head(path, *args, success=success, **kwargs)
284 284

  
285
    def object_get(self, object, format='json', hashmap=False, version=None, binary=False,
285
    def object_get(self, object, format='json', hashmap=False, version=None,
286 286
        data_range=None, if_range=False, if_etag_match=None, if_etag_not_match = None,
287 287
        if_modified_since = None, if_unmodified_since = None, *args, **kwargs):
288 288
        """ Full Pithos+ GET at object level
......
315 315

  
316 316
        path=path4url(self.account, self.container, object)
317 317
        success = kwargs.pop('success', 200)
318
        if binary:
319
            kwargs['binary'] = True
320 318
        return self.get(path, *args, success=success, **kwargs)
321 319

  
322 320
    def object_put(self, object, format='json', hashmap=False, delimiter = None, if_etag_match=None,

Also available in: Unified diff