Revision 9a7efb0d
b/kamaki/clients/__init__.py | ||
---|---|---|
80 | 80 |
def set_default_header(self, name, value): |
81 | 81 |
self.http_client.headers.setdefault(name, value) |
82 | 82 |
|
83 |
def request(self, method, path, **kwargs): |
|
83 |
def request(self, method, path, async_headers={}, async_params={}, **kwargs): |
|
84 |
"""In threaded/asynchronous requests, headers and params are not safe |
|
85 |
Therefore, the standard self.set_header/param system can be used only for |
|
86 |
headers and params that are common for all requests. All other params and |
|
87 |
headers should passes as |
|
88 |
@param async_headers |
|
89 |
@async_params |
|
90 |
E.g. in most queries the 'X-Auth-Token' header might be the same for all, but the |
|
91 |
'Range' header might be different from request to request. |
|
92 |
""" |
|
84 | 93 |
try: |
85 | 94 |
success = kwargs.pop('success', 200) |
86 | 95 |
|
87 |
binary = kwargs.pop('binary', False) |
|
88 | 96 |
data = kwargs.pop('data', None) |
89 | 97 |
self.set_default_header('X-Auth-Token', self.token) |
90 | 98 |
|
... | ... | |
95 | 103 |
self.set_default_header('Content-Length', unicode(len(data))) |
96 | 104 |
|
97 | 105 |
self.http_client.url = self.base_url + path |
98 |
r = self.http_client.perform_request(method=method, data=data)
|
|
106 |
r = self.http_client.perform_request(method, data, async_headers, async_params)
|
|
99 | 107 |
|
100 | 108 |
req = self.http_client |
101 | 109 |
sendlog.info('%s %s', method, req.url) |
... | ... | |
108 | 116 |
recvlog.info('%d %s', r.status_code, r.status) |
109 | 117 |
for key, val in r.headers.items(): |
110 | 118 |
recvlog.info('%s: %s', key, val) |
111 |
recvlog.info('') |
|
112 |
if r.content: |
|
113 |
recvlog.debug(r.content) |
|
119 |
#if r.content: |
|
120 |
# recvlog.debug(r.content) |
|
114 | 121 |
|
115 | 122 |
if success is not None: |
116 | 123 |
# Success can either be an in or a collection |
b/kamaki/clients/connection/__init__.py | ||
---|---|---|
170 | 170 |
def set_method(self, method): |
171 | 171 |
self.method = method |
172 | 172 |
|
173 |
def perform_request(self, method=None, url=None, params=None, headers=None, data=None):
|
|
173 |
def perform_request(self, method=None, url=None, async_headers={}, async_params={}, data=None):
|
|
174 | 174 |
""" |
175 | 175 |
@return an HTTPResponse (also in self.response of this object) |
176 | 176 |
named args offer the ability to reset a request or a part of the request |
b/kamaki/clients/connection/kamakicon.py | ||
---|---|---|
59 | 59 |
for k,v in r.getheaders(): |
60 | 60 |
headers.update({k:v}) |
61 | 61 |
self.headers = headers |
62 |
self.content = r.read() |
|
62 |
from copy import copy |
|
63 |
self.content = copy(r.read()) |
|
64 |
#print('%s %s INSIDE[%s]'%(self.request, self, self.content[1:10])) |
|
63 | 65 |
self.status_code = r.status |
64 | 66 |
self.status = r.reason |
65 | 67 |
self.request.close() |
... | ... | |
90 | 92 |
|
91 | 93 |
class KamakiHTTPConnection(HTTPConnection): |
92 | 94 |
|
93 |
def _retrieve_connection_info(self): |
|
95 |
def _retrieve_connection_info(self, extra_params={}):
|
|
94 | 96 |
""" return (scheme, netloc, url?with¶ms) """ |
95 | 97 |
url = self.url |
96 |
for i,(key, val) in enumerate(self.params.items()): |
|
98 |
params = self.params |
|
99 |
for k,v in extra_params.items(): |
|
100 |
params[k] = v |
|
101 |
for i,(key, val) in enumerate(params.items()): |
|
97 | 102 |
param_str = ('?' if i == 0 else '&') + unicode(key) |
98 | 103 |
if val is not None: |
99 | 104 |
param_str+= '='+unicode(val) |
... | ... | |
103 | 108 |
self.url = url |
104 | 109 |
return (parsed.scheme, parsed.netloc) |
105 | 110 |
|
106 |
def perform_request(self, method=None, data=None): |
|
107 |
(scheme, netloc) = self._retrieve_connection_info() |
|
111 |
def perform_request(self, method=None, data=None, async_headers={}, async_params={}):
|
|
112 |
(scheme, netloc) = self._retrieve_connection_info(extra_params=async_params)
|
|
108 | 113 |
#get connection from pool |
109 | 114 |
conn = get_http_connection(netloc=netloc, scheme=scheme) |
115 |
headers = self.headers |
|
116 |
for k,v in async_headers.items(): |
|
117 |
headers[k] = v |
|
110 | 118 |
try: |
111 |
conn.request(method = method.upper(), url=self.url, headers=self.headers, body=data)
|
|
119 |
conn.request(method = method.upper(), url=self.url, headers=headers, body=data) |
|
112 | 120 |
except: |
113 | 121 |
conn.close() |
114 | 122 |
raise |
115 | 123 |
return KamakiHTTPResponse(conn) |
124 |
|
|
125 |
|
b/kamaki/clients/pithos.py | ||
---|---|---|
244 | 244 |
file_hashmap = HashMap(blocksize, blockhash) |
245 | 245 |
file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen')) |
246 | 246 |
|
247 |
#filter out blocks that are already downloaded |
|
248 | 247 |
for i, x in enumerate(file_hashmap): |
249 | 248 |
local_hash = hexlify(x) |
250 | 249 |
if local_hash in remote_hashes: |
... | ... | |
263 | 262 |
gevent.Greenlet._report_error(self, exc_info) |
264 | 263 |
finally: |
265 | 264 |
sys.stderr = _stderr |
266 |
self.POOL_SIZE = 5 |
|
265 |
if not hasattr(self, 'POOL_SIZE'): |
|
266 |
self.POOL_SIZE = 5 |
|
267 | 267 |
if self.async_pool is None: |
268 | 268 |
self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE) |
269 | 269 |
g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs) |
... | ... | |
273 | 273 |
def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs): |
274 | 274 |
finished = [] |
275 | 275 |
for start, g in flying_greenlets.items(): |
276 |
print('\tIs g ID(%s) ready? %s'%(self.mmaapp[start], g.ready())) |
|
276 | 277 |
if g.ready(): |
277 | 278 |
if g.exception: |
278 |
g.release() |
|
279 | 279 |
raise g.exception |
280 | 280 |
try: |
281 | 281 |
block = g.value.content |
282 | 282 |
except AttributeError: |
283 | 283 |
broken[start] = flying_greenlets.pop(start) |
284 |
#g.spawn() |
|
284 | 285 |
continue |
285 | 286 |
local_file.seek(start) |
287 |
print('\tID(%s) [%s...]\n\tg.value:%s\n\tg:%s\n'%(self.mmaapp[start], block[1:10], |
|
288 |
g.value, g)) |
|
289 |
print('\tID(%s): g.value.request: %s\n---'%(self.mmaapp[start], g.value.request)) |
|
286 | 290 |
local_file.write(block) |
287 | 291 |
#local_file.flush() |
288 | 292 |
self._cb_next() |
... | ... | |
291 | 295 |
return finished |
292 | 296 |
|
293 | 297 |
def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs): |
294 |
|
|
295 |
#let the fly |
|
296 | 298 |
flying_greenlets = {} |
297 | 299 |
finished_greenlets = [] |
298 | 300 |
broken = {} |
301 |
self.mmaapp = {} |
|
299 | 302 |
for block_hash, blockid in remote_hashes.items(): |
303 |
if len(flying_greenlets) >= self.POOL_SIZE: |
|
304 |
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken, |
|
305 |
**restargs) |
|
300 | 306 |
start = blocksize*blockid |
307 |
self.mmaapp[start] = blockid |
|
301 | 308 |
end = total_size-1 if start+blocksize > total_size else start+blocksize-1 |
302 |
restargs['data_range'] = 'bytes=%s-%s'%(start, end)
|
|
303 |
#store info for relaunching greenlet if needed
|
|
309 |
restargs['async_headers'] = dict(data_range='bytes=%s-%s'%(start, end))
|
|
310 |
print('ID(%s) get_grnlt {'%blockid)
|
|
304 | 311 |
flying_greenlets[start] = self._get_block_async(obj, **restargs) |
305 |
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken, |
|
306 |
**restargs) |
|
312 |
print('ID(%s) got_grnlt }'%blockid) |
|
307 | 313 |
|
308 | 314 |
#check the greenlets |
309 | 315 |
while len(flying_greenlets) > 0: |
... | ... | |
333 | 339 |
hash_list, |
334 | 340 |
remote_hashes) = self._get_remote_blocks_info(obj, **restargs) |
335 | 341 |
assert total_size >= 0 |
342 |
self.POOL_SIZE = 5 |
|
336 | 343 |
|
337 | 344 |
if download_cb: |
338 | 345 |
self.progress_bar_gen = download_cb(len(remote_hashes)+1) |
b/kamaki/clients/pithos_rest_api.py | ||
---|---|---|
282 | 282 |
success = kwargs.pop('success', 200) |
283 | 283 |
return self.head(path, *args, success=success, **kwargs) |
284 | 284 |
|
285 |
def object_get(self, object, format='json', hashmap=False, version=None, binary=False,
|
|
285 |
def object_get(self, object, format='json', hashmap=False, version=None, |
|
286 | 286 |
data_range=None, if_range=False, if_etag_match=None, if_etag_not_match = None, |
287 | 287 |
if_modified_since = None, if_unmodified_since = None, *args, **kwargs): |
288 | 288 |
""" Full Pithos+ GET at object level |
... | ... | |
315 | 315 |
|
316 | 316 |
path=path4url(self.account, self.container, object) |
317 | 317 |
success = kwargs.pop('success', 200) |
318 |
if binary: |
|
319 |
kwargs['binary'] = True |
|
320 | 318 |
return self.get(path, *args, success=success, **kwargs) |
321 | 319 |
|
322 | 320 |
def object_put(self, object, format='json', hashmap=False, delimiter = None, if_etag_match=None, |
Also available in: Unified diff