Revision 3dabe5d2 kamaki/clients/pithos.py
b/kamaki/clients/pithos.py | ||
---|---|---|
37 | 37 |
#gevent.monkey.patch_all() |
38 | 38 |
import gevent.pool |
39 | 39 |
|
40 |
from os import fstat, path
|
|
40 |
from os import fstat |
|
41 | 41 |
from hashlib import new as newhashlib |
42 | 42 |
from time import time, sleep |
43 |
from datetime import datetime |
|
44 | 43 |
import sys |
45 | 44 |
|
46 | 45 |
from binascii import hexlify |
... | ... | |
50 | 49 |
from kamaki.clients.utils import path4url, filter_in |
51 | 50 |
from StringIO import StringIO |
52 | 51 |
|
52 |
|
|
53 | 53 |
def pithos_hash(block, blockhash): |
54 | 54 |
h = newhashlib(blockhash) |
55 | 55 |
h.update(block.rstrip('\x00')) |
56 | 56 |
return h.hexdigest() |
57 | 57 |
|
58 |
|
|
58 | 59 |
def _range_up(start, end, a_range): |
59 | 60 |
if a_range: |
60 | 61 |
(rstart, rend) = a_range.split('-') |
61 | 62 |
(rstart, rend) = (int(rstart), int(rend)) |
62 | 63 |
if rstart > end or rend < start: |
63 |
return (0,0) |
|
64 |
return (0, 0)
|
|
64 | 65 |
if rstart > start: |
65 | 66 |
start = rstart |
66 | 67 |
if rend < end: |
67 | 68 |
end = rend |
68 | 69 |
return (start, end) |
69 | 70 |
|
71 |
|
|
70 | 72 |
class PithosClient(PithosRestAPI): |
71 | 73 |
"""GRNet Pithos API client""" |
72 | 74 |
|
73 |
def __init__(self, base_url, token, account=None, container = None): |
|
74 |
super(PithosClient, self).__init__(base_url, token, account = account, |
|
75 |
container = container) |
|
75 |
def __init__(self, base_url, token, account=None, container=None): |
|
76 |
super(PithosClient, self).__init__(base_url, token, account, container) |
|
76 | 77 |
self.async_pool = None |
77 | 78 |
|
78 | 79 |
def purge_container(self): |
79 | 80 |
r = self.container_delete(until=unicode(time())) |
80 | 81 |
r.release() |
81 |
|
|
82 |
def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None, |
|
83 |
content_encoding=None, content_disposition=None, content_type=None, sharing=None, |
|
82 |
|
|
83 |
def upload_object_unchunked(self, obj, f, |
|
84 |
withHashFile=False, |
|
85 |
size=None, |
|
86 |
etag=None, |
|
87 |
content_encoding=None, |
|
88 |
content_disposition=None, |
|
89 |
content_type=None, |
|
90 |
sharing=None, |
|
84 | 91 |
public=None): |
85 |
# This is a naive implementation, it loads the whole file in memory |
|
86 |
#Look in pithos for a nice implementation |
|
87 | 92 |
self.assert_container() |
88 | 93 |
|
89 | 94 |
if withHashFile: |
... | ... | |
92 | 97 |
import json |
93 | 98 |
data = json.dumps(json.loads(data)) |
94 | 99 |
except ValueError: |
95 |
raise ClientError(message='"%s" is not json-formated'%f.name, status=1) |
|
100 |
raise ClientError(message='"%s" is not json-formated' % f.name, |
|
101 |
status=1) |
|
96 | 102 |
except SyntaxError: |
97 |
raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1) |
|
103 |
raise ClientError(message='"%s" is not a valid hashmap file'\ |
|
104 |
% f.name, status=1) |
|
98 | 105 |
f = StringIO(data) |
99 | 106 |
data = f.read(size) if size is not None else f.read() |
100 |
r = self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding, |
|
101 |
content_disposition=content_disposition, content_type=content_type, permitions=sharing, |
|
102 |
public=public, success=201) |
|
107 |
r = self.object_put(obj, |
|
108 |
data=data, |
|
109 |
etag=etag, |
|
110 |
content_encoding=content_encoding, |
|
111 |
content_disposition=content_disposition, |
|
112 |
content_type=content_type, |
|
113 |
permissions=sharing, |
|
114 |
public=public, |
|
115 |
success=201) |
|
103 | 116 |
r.release() |
104 |
|
|
105 |
#upload_* auxiliary methods
|
|
117 |
|
|
118 |
# upload_* auxiliary methods
|
|
106 | 119 |
def put_block_async(self, data, hash): |
107 | 120 |
class SilentGreenlet(gevent.Greenlet): |
108 | 121 |
def _report_error(self, exc_info): |
... | ... | |
111 | 124 |
gevent.Greenlet._report_error(self, exc_info) |
112 | 125 |
finally: |
113 | 126 |
if hasattr(sys, '_stderr'): |
114 |
sys.stderr = _stderr |
|
127 |
sys.stderr = sys._stderr
|
|
115 | 128 |
POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5 |
116 | 129 |
if self.async_pool is None: |
117 | 130 |
self.async_pool = gevent.pool.Pool(size=POOL_SIZE) |
... | ... | |
120 | 133 |
return g |
121 | 134 |
|
122 | 135 |
def put_block(self, data, hash): |
123 |
r = self.container_post(update=True, content_type='application/octet-stream', |
|
124 |
content_length=len(data), data=data, format='json') |
|
136 |
r = self.container_post(update=True, |
|
137 |
content_type='application/octet-stream', |
|
138 |
content_length=len(data), |
|
139 |
data=data, |
|
140 |
format='json') |
|
125 | 141 |
assert r.json[0] == hash, 'Local hash does not match server' |
126 |
|
|
127 |
def create_object_by_manifestation(self, obj, etag=None, content_encoding=None, |
|
128 |
content_disposition=None, content_type=None, sharing=None, public=None): |
|
142 |
|
|
143 |
def create_object_by_manifestation(self, obj, |
|
144 |
etag=None, |
|
145 |
content_encoding=None, |
|
146 |
content_disposition=None, |
|
147 |
content_type=None, |
|
148 |
sharing=None, |
|
149 |
public=None): |
|
129 | 150 |
self.assert_container() |
130 |
obj_content_type = 'application/octet-stream' if content_type is None else content_type |
|
131 |
r = self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding, |
|
132 |
content_disposition=content_disposition, content_type=content_type, permitions=sharing, |
|
133 |
public=public, manifest='%s/%s'%(self.container,obj)) |
|
151 |
r = self.object_put(obj, |
|
152 |
content_length=0, |
|
153 |
etag=etag, |
|
154 |
content_encoding=content_encoding, |
|
155 |
content_disposition=content_disposition, |
|
156 |
content_type=content_type, |
|
157 |
permissions=sharing, |
|
158 |
public=public, |
|
159 |
manifest='%s/%s' % (self.container, obj)) |
|
134 | 160 |
r.release() |
135 |
|
|
161 |
|
|
136 | 162 |
def _get_file_block_info(self, fileobj, size=None): |
137 | 163 |
meta = self.get_container_info() |
138 | 164 |
blocksize = int(meta['x-container-block-size']) |
... | ... | |
141 | 167 |
nblocks = 1 + (size - 1) // blocksize |
142 | 168 |
return (blocksize, blockhash, size, nblocks) |
143 | 169 |
|
144 |
def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True, |
|
145 |
content_type=None, etag=None, content_encoding=None, content_disposition=None, |
|
146 |
permitions=None, public=None, success=(201, 409)): |
|
147 |
r = self.object_put(obj, format='json', hashmap=True, content_type=content_type, |
|
148 |
json=json, etag=etag, content_encoding=content_encoding, |
|
149 |
content_disposition=content_disposition, permitions=permitions, public=public, |
|
170 |
def _get_missing_hashes(self, obj, json, |
|
171 |
size=None, |
|
172 |
format='json', |
|
173 |
hashmap=True, |
|
174 |
content_type=None, |
|
175 |
etag=None, |
|
176 |
content_encoding=None, |
|
177 |
content_disposition=None, |
|
178 |
permissions=None, |
|
179 |
public=None, |
|
180 |
success=(201, 409)): |
|
181 |
r = self.object_put(obj, |
|
182 |
format='json', |
|
183 |
hashmap=True, |
|
184 |
content_type=content_type, |
|
185 |
json=json, |
|
186 |
etag=etag, |
|
187 |
content_encoding=content_encoding, |
|
188 |
content_disposition=content_disposition, |
|
189 |
permissions=permissions, |
|
190 |
public=public, |
|
150 | 191 |
success=success) |
151 | 192 |
if r.status_code == 201: |
152 | 193 |
r.release() |
153 | 194 |
return None |
154 | 195 |
return r.json |
155 | 196 |
|
156 |
def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj, |
|
197 |
def _caclulate_uploaded_blocks(self, |
|
198 |
blocksize, |
|
199 |
blockhash, |
|
200 |
size, |
|
201 |
nblocks, |
|
202 |
hashes, |
|
203 |
hmap, |
|
204 |
fileobj, |
|
157 | 205 |
hash_cb=None): |
158 |
offset=0
|
|
206 |
offset = 0
|
|
159 | 207 |
if hash_cb: |
160 | 208 |
hash_gen = hash_cb(nblocks) |
161 | 209 |
hash_gen.next() |
... | ... | |
172 | 220 |
assert offset == size |
173 | 221 |
|
174 | 222 |
def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None): |
175 |
"""upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
|
|
223 |
"""upload missing blocks asynchronously. Use greenlets to avoid waiting
|
|
176 | 224 |
""" |
177 | 225 |
if upload_cb: |
178 | 226 |
upload_gen = upload_cb(len(missing)) |
... | ... | |
201 | 249 |
|
202 | 250 |
failures = [r for r in flying if r.exception] |
203 | 251 |
if len(failures): |
204 |
details = ', '.join(['(%s).%s'%(i,r.exception) for i,r in enumerate(failures)]) |
|
205 |
raise ClientError(message="Block uploading failed", status=505, details=details) |
|
206 |
|
|
207 |
def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None, |
|
208 |
content_encoding=None, content_disposition=None, content_type=None, sharing=None, |
|
252 |
details = ', '.join(['(%s).%s' % (i, r.exception)\ |
|
253 |
for i, r in enumerate(failures)]) |
|
254 |
raise ClientError(message="Block uploading failed", |
|
255 |
status=505, |
|
256 |
details=details) |
|
257 |
|
|
258 |
def upload_object(self, obj, f, |
|
259 |
size=None, |
|
260 |
hash_cb=None, |
|
261 |
upload_cb=None, |
|
262 |
etag=None, |
|
263 |
content_encoding=None, |
|
264 |
content_disposition=None, |
|
265 |
content_type=None, |
|
266 |
sharing=None, |
|
209 | 267 |
public=None): |
210 | 268 |
self.assert_container() |
211 | 269 |
|
212 | 270 |
#init |
213 |
block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size) |
|
271 |
block_info = (blocksize, blockhash, size, nblocks) =\ |
|
272 |
self._get_file_block_info(f, size) |
|
214 | 273 |
(hashes, hmap, offset) = ([], {}, 0) |
215 |
content_type = 'application/octet-stream' if content_type is None else content_type |
|
274 |
if content_type is None: |
|
275 |
content_type = 'application/octet-stream' |
|
216 | 276 |
|
217 |
self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f, |
|
277 |
self._caclulate_uploaded_blocks(*block_info, |
|
278 |
hashes=hashes, |
|
279 |
hmap=hmap, |
|
280 |
fileobj=f, |
|
218 | 281 |
hash_cb=hash_cb) |
219 | 282 |
|
220 | 283 |
hashmap = dict(bytes=size, hashes=hashes) |
221 |
missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size, |
|
222 |
etag=etag, content_encoding=content_encoding, content_disposition=content_disposition, |
|
223 |
permitions=sharing, public=public) |
|
284 |
missing = self._get_missing_hashes(obj, hashmap, |
|
285 |
content_type=content_type, |
|
286 |
size=size, |
|
287 |
etag=etag, |
|
288 |
content_encoding=content_encoding, |
|
289 |
content_disposition=content_disposition, |
|
290 |
permissions=sharing, |
|
291 |
public=public) |
|
224 | 292 |
|
225 | 293 |
if missing is None: |
226 | 294 |
return |
227 | 295 |
self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb) |
228 | 296 |
|
229 |
r = self.object_put(obj, format='json', hashmap=True, content_type=content_type, |
|
230 |
json=hashmap, success=201) |
|
297 |
r = self.object_put(obj, |
|
298 |
format='json', |
|
299 |
hashmap=True, |
|
300 |
content_type=content_type, |
|
301 |
json=hashmap, |
|
302 |
success=201) |
|
231 | 303 |
r.release() |
232 |
|
|
304 |
|
|
233 | 305 |
#download_* auxiliary methods |
234 | 306 |
#ALl untested |
235 | 307 |
def _get_remote_blocks_info(self, obj, **restargs): |
236 | 308 |
#retrieve object hashmap |
237 |
myrange = restargs.pop('data_range') if 'data_range' in restargs.keys() else None
|
|
309 |
myrange = restargs.pop('data_range', None)
|
|
238 | 310 |
hashmap = self.get_object_hashmap(obj, **restargs) |
239 | 311 |
restargs['data_range'] = myrange |
240 | 312 |
blocksize = int(hashmap['block_size']) |
... | ... | |
246 | 318 |
map_dict[h] = i |
247 | 319 |
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict) |
248 | 320 |
|
249 |
def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs): |
|
321 |
def _dump_blocks_sync(self, |
|
322 |
obj, |
|
323 |
remote_hashes, |
|
324 |
blocksize, |
|
325 |
total_size, |
|
326 |
dst, |
|
327 |
range, |
|
328 |
**restargs): |
|
250 | 329 |
for blockid, blockhash in enumerate(remote_hashes): |
251 | 330 |
if blockhash == None: |
252 | 331 |
continue |
253 |
start = blocksize*blockid |
|
254 |
end = total_size-1 if start+blocksize > total_size else start+blocksize-1 |
|
332 |
start = blocksize * blockid |
|
333 |
end = total_size - 1 if start + blocksize > total_size\ |
|
334 |
else start + blocksize - 1 |
|
255 | 335 |
(start, end) = _range_up(start, end, range) |
256 |
restargs['data_range'] = 'bytes=%s-%s'%(start, end)
|
|
336 |
restargs['data_range'] = 'bytes=%s-%s' % (start, end)
|
|
257 | 337 |
r = self.object_get(obj, success=(200, 206), **restargs) |
258 | 338 |
self._cb_next() |
259 | 339 |
dst.write(r.content) |
... | ... | |
272 | 352 |
self.POOL_SIZE = 5 |
273 | 353 |
if self.async_pool is None: |
274 | 354 |
self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE) |
275 |
g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs) |
|
355 |
g = SilentGreenlet(self.object_get, obj, |
|
356 |
success=(200, 206), |
|
357 |
**restargs) |
|
276 | 358 |
self.async_pool.start(g) |
277 | 359 |
return g |
278 | 360 |
|
... | ... | |
283 | 365 |
h.update(block.strip('\x00')) |
284 | 366 |
return hexlify(h.digest()) |
285 | 367 |
|
286 |
def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs): |
|
368 |
def _greenlet2file(self, |
|
369 |
flying_greenlets, |
|
370 |
local_file, |
|
371 |
offset=0, |
|
372 |
**restargs): |
|
287 | 373 |
"""write the results of a greenleted rest call to a file |
288 |
@offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all |
|
374 |
@offset: the offset of the file up to blocksize |
|
375 |
- e.g. if the range is 10-100, all |
|
289 | 376 |
blocks will be written to normal_position - 10""" |
290 | 377 |
finished = [] |
291 | 378 |
for start, g in flying_greenlets.items(): |
... | ... | |
300 | 387 |
local_file.flush() |
301 | 388 |
return finished |
302 | 389 |
|
303 |
def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, |
|
304 |
blockhash=None, resume=False, filerange = None, **restargs): |
|
390 |
def _dump_blocks_async(self, |
|
391 |
obj, |
|
392 |
remote_hashes, |
|
393 |
blocksize, |
|
394 |
total_size, |
|
395 |
local_file, |
|
396 |
blockhash=None, |
|
397 |
resume=False, |
|
398 |
filerange=None, |
|
399 |
**restargs): |
|
305 | 400 |
|
306 | 401 |
file_size = fstat(local_file.fileno()).st_size if resume else 0 |
307 | 402 |
flying_greenlets = {} |
... | ... | |
309 | 404 |
offset = 0 |
310 | 405 |
if filerange is not None: |
311 | 406 |
rstart = int(filerange.split('-')[0]) |
312 |
offset = rstart if blocksize > rstart else rstart%blocksize
|
|
407 |
offset = rstart if blocksize > rstart else rstart % blocksize
|
|
313 | 408 |
for block_hash, blockid in remote_hashes.items(): |
314 |
start = blocksize*blockid |
|
315 |
if start < file_size and block_hash == self._hash_from_file(local_file, |
|
316 |
start, blocksize, blockhash): |
|
317 |
self._cb_next() |
|
318 |
continue |
|
409 |
start = blocksize * blockid |
|
410 |
if start < file_size\ |
|
411 |
and block_hash == self._hash_from_file(local_file, |
|
412 |
start, |
|
413 |
blocksize, |
|
414 |
blockhash): |
|
415 |
self._cb_next() |
|
416 |
continue |
|
319 | 417 |
if len(flying_greenlets) >= self.POOL_SIZE: |
320 |
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset, |
|
418 |
finished_greenlets += self._greenlet2file(flying_greenlets, |
|
419 |
local_file, |
|
420 |
offset, |
|
321 | 421 |
**restargs) |
322 |
end = total_size-1 if start+blocksize > total_size else start+blocksize-1 |
|
422 |
end = total_size - 1 if start + blocksize > total_size\ |
|
423 |
else start + blocksize - 1 |
|
323 | 424 |
(start, end) = _range_up(start, end, filerange) |
324 | 425 |
if start == end: |
325 | 426 |
self._cb_next() |
326 | 427 |
continue |
327 |
restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
|
|
428 |
restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
|
|
328 | 429 |
flying_greenlets[start] = self._get_block_async(obj, **restargs) |
329 | 430 |
|
330 | 431 |
#check the greenlets |
331 | 432 |
while len(flying_greenlets) > 0: |
332 | 433 |
sleep(0.001) |
333 |
finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset, |
|
434 |
finished_greenlets += self._greenlet2file(flying_greenlets, |
|
435 |
local_file, |
|
436 |
offset, |
|
334 | 437 |
**restargs) |
335 | 438 |
|
336 | 439 |
gevent.joinall(finished_greenlets) |
337 | 440 |
|
338 |
def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False, |
|
339 |
range=None, if_match=None, if_none_match=None, if_modified_since=None, |
|
441 |
def download_object(self, |
|
442 |
obj, |
|
443 |
dst, |
|
444 |
download_cb=None, |
|
445 |
version=None, |
|
446 |
overide=False, |
|
447 |
resume=False, |
|
448 |
range=None, |
|
449 |
if_match=None, |
|
450 |
if_none_match=None, |
|
451 |
if_modified_since=None, |
|
340 | 452 |
if_unmodified_since=None): |
341 | 453 |
|
342 |
restargs=dict(version=version,
|
|
343 |
data_range = None if range is None else 'bytes=%s'%range,
|
|
454 |
restargs = dict(version=version,
|
|
455 |
data_range=None if range is None else 'bytes=%s' % range,
|
|
344 | 456 |
if_match=if_match, |
345 | 457 |
if_none_match=if_none_match, |
346 | 458 |
if_modified_since=if_modified_since, |
347 | 459 |
if_unmodified_since=if_unmodified_since) |
348 | 460 |
|
349 |
( blocksize,
|
|
461 |
(blocksize, |
|
350 | 462 |
blockhash, |
351 | 463 |
total_size, |
352 |
hash_list,
|
|
464 |
hash_list, |
|
353 | 465 |
remote_hashes) = self._get_remote_blocks_info(obj, **restargs) |
354 | 466 |
assert total_size >= 0 |
355 | 467 |
self.POOL_SIZE = 5 |
... | ... | |
359 | 471 |
self._cb_next() |
360 | 472 |
|
361 | 473 |
if dst.isatty(): |
362 |
self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs) |
|
474 |
self._dump_blocks_sync(obj, |
|
475 |
hash_list, |
|
476 |
blocksize, |
|
477 |
total_size, |
|
478 |
dst, |
|
479 |
range, |
|
480 |
**restargs) |
|
363 | 481 |
else: |
364 |
self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash, |
|
365 |
resume, range, **restargs) |
|
482 |
self._dump_blocks_async(obj, |
|
483 |
remote_hashes, |
|
484 |
blocksize, |
|
485 |
total_size, |
|
486 |
dst, |
|
487 |
blockhash, |
|
488 |
resume, |
|
489 |
range, |
|
490 |
**restargs) |
|
366 | 491 |
if range is None: |
367 | 492 |
dst.truncate(total_size) |
368 | 493 |
|
... | ... | |
375 | 500 |
self.progress_bar_gen.next() |
376 | 501 |
except: |
377 | 502 |
pass |
503 |
|
|
378 | 504 |
def _complete_cb(self): |
379 | 505 |
while True: |
380 | 506 |
try: |
... | ... | |
382 | 508 |
except: |
383 | 509 |
break |
384 | 510 |
|
385 |
#Untested - except is download_object is tested first |
|
386 |
def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None, |
|
387 |
if_modified_since=None, if_unmodified_since=None, data_range=None): |
|
511 |
# Untested - except is download_object is tested first |
|
512 |
def get_object_hashmap(self, obj, |
|
513 |
version=None, |
|
514 |
if_match=None, |
|
515 |
if_none_match=None, |
|
516 |
if_modified_since=None, |
|
517 |
if_unmodified_since=None, |
|
518 |
data_range=None): |
|
388 | 519 |
try: |
389 |
r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match, |
|
390 |
if_etag_not_match=if_none_match, if_modified_since=if_modified_since, |
|
391 |
if_unmodified_since=if_unmodified_since, data_range=data_range) |
|
520 |
r = self.object_get(obj, |
|
521 |
hashmap=True, |
|
522 |
version=version, |
|
523 |
if_etag_match=if_match, |
|
524 |
if_etag_not_match=if_none_match, |
|
525 |
if_modified_since=if_modified_since, |
|
526 |
if_unmodified_since=if_unmodified_since, |
|
527 |
data_range=data_range) |
|
392 | 528 |
except ClientError as err: |
393 | 529 |
if err.status == 304 or err.status == 412: |
394 | 530 |
return {} |
... | ... | |
396 | 532 |
return r.json |
397 | 533 |
|
398 | 534 |
def set_account_group(self, group, usernames): |
399 |
r = self.account_post(update=True, groups = {group:usernames})
|
|
535 |
r = self.account_post(update=True, groups={group: usernames})
|
|
400 | 536 |
r.release() |
401 | 537 |
|
402 | 538 |
def del_account_group(self, group): |
403 |
r = self.account_post(update=True, groups={group:[]}) |
|
539 |
r = self.account_post(update=True, groups={group: []})
|
|
404 | 540 |
r.release() |
405 | 541 |
|
406 | 542 |
def get_account_info(self, until=None): |
... | ... | |
410 | 546 |
return r.headers |
411 | 547 |
|
412 | 548 |
def get_account_quota(self): |
413 |
return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True) |
|
549 |
return filter_in(self.get_account_info(), |
|
550 |
'X-Account-Policy-Quota', |
|
551 |
exactMatch=True) |
|
414 | 552 |
|
415 | 553 |
def get_account_versioning(self): |
416 |
return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True) |
|
554 |
return filter_in(self.get_account_info(), |
|
555 |
'X-Account-Policy-Versioning', |
|
556 |
exactMatch=True) |
|
417 | 557 |
|
418 | 558 |
def get_account_meta(self, until=None): |
419 |
return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
|
|
559 |
return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
|
|
420 | 560 |
|
421 | 561 |
def get_account_group(self): |
422 | 562 |
return filter_in(self.get_account_info(), 'X-Account-Group-') |
... | ... | |
427 | 567 |
r.release() |
428 | 568 |
|
429 | 569 |
def del_account_meta(self, metakey): |
430 |
r = self.account_post(update=True, metadata={metakey:''}) |
|
570 |
r = self.account_post(update=True, metadata={metakey: ''})
|
|
431 | 571 |
r.release() |
432 | 572 |
|
433 | 573 |
def set_account_quota(self, quota): |
... | ... | |
435 | 575 |
r.release() |
436 | 576 |
|
437 | 577 |
def set_account_versioning(self, versioning): |
438 |
r = self.account_post(update=True, versioning = versioning)
|
|
578 |
r = self.account_post(update=True, versioning=versioning)
|
|
439 | 579 |
r.release() |
440 | 580 |
|
441 | 581 |
def list_containers(self): |
... | ... | |
444 | 584 |
|
445 | 585 |
def del_container(self, until=None, delimiter=None): |
446 | 586 |
self.assert_container() |
447 |
r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409)) |
|
587 |
r = self.container_delete(until=until, |
|
588 |
delimiter=delimiter, |
|
589 |
success=(204, 404, 409)) |
|
448 | 590 |
r.release() |
449 | 591 |
if r.status_code == 404: |
450 |
raise ClientError('Container "%s" does not exist'%self.container, r.status_code) |
|
592 |
raise ClientError('Container "%s" does not exist' % self.container, |
|
593 |
r.status_code) |
|
451 | 594 |
elif r.status_code == 409: |
452 |
raise ClientError('Container "%s" is not empty'%self.container, r.status_code) |
|
595 |
raise ClientError('Container "%s" is not empty' % self.container, |
|
596 |
r.status_code) |
|
453 | 597 |
|
454 | 598 |
def get_container_versioning(self, container): |
455 | 599 |
self.container = container |
456 |
return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning') |
|
600 |
return filter_in(self.get_container_info(), |
|
601 |
'X-Container-Policy-Versioning') |
|
457 | 602 |
|
458 | 603 |
def get_container_quota(self, container): |
459 | 604 |
self.container = container |
460 | 605 |
return filter_in(self.get_container_info(), 'X-Container-Policy-Quota') |
461 | 606 |
|
462 |
def get_container_info(self, until = None):
|
|
607 |
def get_container_info(self, until=None):
|
|
463 | 608 |
r = self.container_head(until=until) |
464 | 609 |
return r.headers |
465 | 610 |
|
466 |
def get_container_meta(self, until = None): |
|
467 |
return filter_in(self.get_container_info(until=until), 'X-Container-Meta') |
|
611 |
def get_container_meta(self, until=None): |
|
612 |
return filter_in(self.get_container_info(until=until), |
|
613 |
'X-Container-Meta') |
|
468 | 614 |
|
469 |
def get_container_object_meta(self, until = None): |
|
470 |
return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta') |
|
615 |
def get_container_object_meta(self, until=None): |
|
616 |
return filter_in(self.get_container_info(until=until), |
|
617 |
'X-Container-Object-Meta') |
|
471 | 618 |
|
472 | 619 |
def set_container_meta(self, metapairs): |
473 | 620 |
assert(type(metapairs) is dict) |
474 | 621 |
r = self.container_post(update=True, metadata=metapairs) |
475 | 622 |
r.release() |
476 |
|
|
623 |
|
|
477 | 624 |
def del_container_meta(self, metakey): |
478 |
r = self.container_post(update=True, metadata={metakey:''}) |
|
625 |
r = self.container_post(update=True, metadata={metakey: ''})
|
|
479 | 626 |
r.release() |
480 | 627 |
|
481 | 628 |
def set_container_quota(self, quota): |
... | ... | |
497 | 644 |
r.release() |
498 | 645 |
|
499 | 646 |
def del_object_meta(self, metakey, object): |
500 |
r = self.object_post(object, update=True, metadata={metakey:''}) |
|
647 |
r = self.object_post(object, update=True, metadata={metakey: ''})
|
|
501 | 648 |
r.release() |
502 | 649 |
|
503 | 650 |
def publish_object(self, object): |
... | ... | |
513 | 660 |
return r.headers |
514 | 661 |
|
515 | 662 |
def get_object_meta(self, obj, version=None): |
516 |
return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta') |
|
663 |
return filter_in(self.get_object_info(obj, version=version), |
|
664 |
'X-Object-Meta') |
|
517 | 665 |
|
518 | 666 |
def get_object_sharing(self, object): |
519 |
r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True) |
|
667 |
r = filter_in(self.get_object_info(object), |
|
668 |
'X-Object-Sharing', |
|
669 |
exactMatch=True) |
|
520 | 670 |
reply = {} |
521 | 671 |
if len(r) > 0: |
522 | 672 |
perms = r['x-object-sharing'].split(';') |
... | ... | |
529 | 679 |
reply[key] = val |
530 | 680 |
return reply |
531 | 681 |
|
532 |
def set_object_sharing(self, object, read_permition = False, write_permition = False): |
|
682 |
def set_object_sharing(self, object, |
|
683 |
read_permition=False, |
|
684 |
write_permition=False): |
|
533 | 685 |
"""Give read/write permisions to an object. |
534 |
@param object is the object to change sharing permitions onto |
|
535 |
@param read_permition is a list of users and user groups that get read permition for this object |
|
536 |
False means all previous read permitions will be removed |
|
537 |
@param write_perimition is a list of users and user groups to get write permition for this object |
|
538 |
False means all previous read permitions will be removed |
|
686 |
@param object is the object to change sharing permissions |
|
687 |
onto |
|
688 |
@param read_permition is a list of users and user groups that |
|
689 |
get read permition for this object |
|
690 |
False means all previous read permissions |
|
691 |
will be removed |
|
692 |
@param write_perimition is a list of users and user groups to |
|
693 |
get write permition for this object |
|
694 |
False means all previous read permissions |
|
695 |
will be removed |
|
539 | 696 |
""" |
540 |
perms = {} |
|
541 |
perms['read'] = read_permition if isinstance(read_permition, list) else '' |
|
542 |
perms['write'] = write_permition if isinstance(write_permition, list) else '' |
|
543 |
r = self.object_post(object, update=True, permitions=perms) |
|
697 |
perms = dict(read='' if not read_permition else read_permition, |
|
698 |
write='' if not write_permition else write_permition) |
|
699 |
r = self.object_post(object, update=True, permissions=perms) |
|
544 | 700 |
r.release() |
545 | 701 |
|
546 | 702 |
def del_object_sharing(self, object): |
547 | 703 |
self.set_object_sharing(object) |
548 | 704 |
|
549 |
def append_object(self, object, source_file, upload_cb = None):
|
|
705 |
def append_object(self, object, source_file, upload_cb=None):
|
|
550 | 706 |
"""@param upload_db is a generator for showing progress of upload |
551 | 707 |
to caller application, e.g. a progress bar. Its next is called |
552 | 708 |
whenever a block is uploaded |
... | ... | |
555 | 711 |
meta = self.get_container_info() |
556 | 712 |
blocksize = int(meta['x-container-block-size']) |
557 | 713 |
filesize = fstat(source_file.fileno()).st_size |
558 |
nblocks = 1 + (filesize - 1)//blocksize
|
|
714 |
nblocks = 1 + (filesize - 1) // blocksize
|
|
559 | 715 |
offset = 0 |
560 | 716 |
if upload_cb is not None: |
561 | 717 |
upload_gen = upload_cb(nblocks) |
562 | 718 |
for i in range(nblocks): |
563 | 719 |
block = source_file.read(min(blocksize, filesize - offset)) |
564 | 720 |
offset += len(block) |
565 |
r = self.object_post(object, update=True, content_range='bytes */*', |
|
566 |
content_type='application/octet-stream', content_length=len(block), data=block) |
|
721 |
r = self.object_post(object, |
|
722 |
update=True, |
|
723 |
content_range='bytes */*', |
|
724 |
content_type='application/octet-stream', |
|
725 |
content_length=len(block), |
|
726 |
data=block) |
|
567 | 727 |
r.release() |
568 |
|
|
728 |
|
|
569 | 729 |
if upload_cb is not None: |
570 | 730 |
upload_gen.next() |
571 | 731 |
|
572 | 732 |
def truncate_object(self, object, upto_bytes): |
573 |
r = self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes, |
|
574 |
content_type='application/octet-stream', object_bytes=upto_bytes, |
|
733 |
r = self.object_post(object, |
|
734 |
update=True, |
|
735 |
content_range='bytes 0-%s/*' % upto_bytes, |
|
736 |
content_type='application/octet-stream', |
|
737 |
object_bytes=upto_bytes, |
|
575 | 738 |
source_object=path4url(self.container, object)) |
576 | 739 |
r.release() |
577 | 740 |
|
578 |
def overwrite_object(self, object, start, end, source_file, upload_cb=None): |
|
741 |
def overwrite_object(self, |
|
742 |
object, |
|
743 |
start, |
|
744 |
end, |
|
745 |
source_file, |
|
746 |
upload_cb=None): |
|
579 | 747 |
"""Overwrite a part of an object with given source file |
580 |
@start the part of the remote object to start overwriting from, in bytes |
|
748 |
@start the part of the remote object to start overwriting from, |
|
749 |
in bytes |
|
581 | 750 |
@end the part of the remote object to stop overwriting to, in bytes |
582 | 751 |
""" |
583 | 752 |
self.assert_container() |
... | ... | |
585 | 754 |
blocksize = int(meta['x-container-block-size']) |
586 | 755 |
filesize = fstat(source_file.fileno()).st_size |
587 | 756 |
datasize = int(end) - int(start) + 1 |
588 |
nblocks = 1 + (datasize - 1)//blocksize
|
|
757 |
nblocks = 1 + (datasize - 1) // blocksize
|
|
589 | 758 |
offset = 0 |
590 | 759 |
if upload_cb is not None: |
591 | 760 |
upload_gen = upload_cb(nblocks) |
592 | 761 |
for i in range(nblocks): |
593 |
block = source_file.read(min(blocksize, filesize - offset, datasize - offset)) |
|
762 |
block = source_file.read(min(blocksize, |
|
763 |
filesize - offset, |
|
764 |
datasize - offset)) |
|
594 | 765 |
offset += len(block) |
595 |
r = self.object_post(object, update=True, content_type='application/octet-stream', |
|
596 |
content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block) |
|
766 |
r = self.object_post(object, |
|
767 |
update=True, |
|
768 |
content_type='application/octet-stream', |
|
769 |
content_length=len(block), |
|
770 |
content_range='bytes %s-%s/*' % (start, end), |
|
771 |
data=block) |
|
597 | 772 |
r.release() |
598 |
|
|
773 |
|
|
599 | 774 |
if upload_cb is not None: |
600 | 775 |
upload_gen.next() |
601 | 776 |
|
602 |
def copy_object(self, src_container, src_object, dst_container, dst_object=False, |
|
603 |
source_version = None, public=False, content_type=None, delimiter=None): |
|
777 |
def copy_object(self, src_container, src_object, dst_container, |
|
778 |
dst_object=False, |
|
779 |
source_version=None, |
|
780 |
public=False, |
|
781 |
content_type=None, |
|
782 |
delimiter=None): |
|
604 | 783 |
self.assert_account() |
605 | 784 |
self.container = dst_container |
606 | 785 |
dst_object = dst_object or src_object |
607 | 786 |
src_path = path4url(src_container, src_object) |
608 |
r = self.object_put(dst_object, success=201, copy_from=src_path, content_length=0, |
|
609 |
source_version=source_version, public=public, content_type=content_type, |
|
787 |
r = self.object_put(dst_object, |
|
788 |
success=201, |
|
789 |
copy_from=src_path, |
|
790 |
content_length=0, |
|
791 |
source_version=source_version, |
|
792 |
public=public, |
|
793 |
content_type=content_type, |
|
610 | 794 |
delimiter=delimiter) |
611 | 795 |
r.release() |
612 | 796 |
|
613 |
def move_object(self, src_container, src_object, dst_container, dst_object=False, |
|
614 |
source_version = None, public=False, content_type=None, delimiter=None): |
|
797 |
def move_object(self, src_container, src_object, dst_container, |
|
798 |
dst_object=False, |
|
799 |
source_version=None, |
|
800 |
public=False, |
|
801 |
content_type=None, |
|
802 |
delimiter=None): |
|
615 | 803 |
self.assert_account() |
616 | 804 |
self.container = dst_container |
617 | 805 |
dst_object = dst_object or src_object |
618 | 806 |
src_path = path4url(src_container, src_object) |
619 |
r = self.object_put(dst_object, success=201, move_from=src_path, content_length=0, |
|
620 |
source_version=source_version, public=public, content_type=content_type, |
|
807 |
r = self.object_put(dst_object, |
|
808 |
success=201, |
|
809 |
move_from=src_path, |
|
810 |
content_length=0, |
|
811 |
source_version=source_version, |
|
812 |
public=public, |
|
813 |
content_type=content_type, |
|
621 | 814 |
delimiter=delimiter) |
622 | 815 |
r.release() |
623 | 816 |
|
... | ... | |
625 | 818 |
"""Get accounts that share with self.account""" |
626 | 819 |
self.assert_account() |
627 | 820 |
|
628 |
self.set_param('format','json') |
|
629 |
self.set_param('limit',limit, iff = limit is not None)
|
|
630 |
self.set_param('marker',marker, iff = marker is not None)
|
|
821 |
self.set_param('format', 'json')
|
|
822 |
self.set_param('limit', limit, iff=limit is not None)
|
|
823 |
self.set_param('marker', marker, iff=marker is not None)
|
|
631 | 824 |
|
632 | 825 |
path = '' |
633 | 826 |
success = kwargs.pop('success', (200, 204)) |
634 |
r = self.get(path, *args, success = success, **kwargs)
|
|
827 |
r = self.get(path, *args, success=success, **kwargs)
|
|
635 | 828 |
return r.json |
636 | 829 |
|
637 | 830 |
def get_object_versionlist(self, path): |
638 | 831 |
self.assert_container() |
639 | 832 |
r = self.object_get(path, format='json', version='list') |
640 | 833 |
return r.json['versions'] |
641 |
|
Also available in: Unified diff