1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 # Monkey-patch everything for gevent early on
37 #gevent.monkey.patch_all()
40 from os import fstat, path
41 from hashlib import new as newhashlib
42 from time import time, sleep
43 from datetime import datetime
46 from binascii import hexlify
48 from .pithos_rest_api import PithosRestAPI
49 from .storage import ClientError
50 from .utils import path4url, filter_in
51 from StringIO import StringIO
53 def pithos_hash(block, blockhash):
54 h = newhashlib(blockhash)
55 h.update(block.rstrip('\x00'))
58 def _range_up(start, end, a_range):
60 (rstart, rend) = a_range.split('-')
61 (rstart, rend) = (int(rstart), int(rend))
62 if rstart > end or rend < start:
70 class PithosClient(PithosRestAPI):
71 """GRNet Pithos API client"""
73 def __init__(self, base_url, token, account=None, container = None):
74 super(PithosClient, self).__init__(base_url, token, account = account,
75 container = container)
76 self.async_pool = None
78 def purge_container(self):
79 self.container_delete(until=unicode(time()))
81 def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
82 content_encoding=None, content_disposition=None, content_type=None, sharing=None,
84 # This is a naive implementation, it loads the whole file in memory
85 #Look in pithos for a nice implementation
86 self.assert_container()
92 data = json.dumps(json.loads(data))
94 raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
96 raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
98 data = f.read(size) if size is not None else f.read()
99 self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
100 content_disposition=content_disposition, content_type=content_type, permitions=sharing,
101 public=public, success=201)
103 #upload_* auxiliary methods
104 def put_block_async(self, data, hash):
105 class SilentGreenlet(gevent.Greenlet):
106 def _report_error(self, exc_info):
108 sys.stderr = StringIO()
109 gevent.Greenlet._report_error(self, exc_info)
111 if hasattr(sys, '_stderr'):
113 POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5
114 if self.async_pool is None:
115 self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
116 g = SilentGreenlet(self.put_block, data, hash)
117 self.async_pool.start(g)
120 def put_block(self, data, hash):
121 r = self.container_post(update=True, content_type='application/octet-stream',
122 content_length=len(data), data=data, format='json')
123 assert r.json[0] == hash, 'Local hash does not match server'
125 def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
126 content_disposition=None, content_type=None, sharing=None, public=None):
127 self.assert_container()
128 obj_content_type = 'application/octet-stream' if content_type is None else content_type
129 self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
130 content_disposition=content_disposition, content_type=content_type, permitions=sharing,
131 public=public, manifest='%s/%s'%(self.container,obj))
133 def _get_file_block_info(self, fileobj, size=None):
134 meta = self.get_container_info()
135 blocksize = int(meta['x-container-block-size'])
136 blockhash = meta['x-container-block-hash']
137 size = size if size is not None else fstat(fileobj.fileno()).st_size
138 nblocks = 1 + (size - 1) // blocksize
139 return (blocksize, blockhash, size, nblocks)
141 def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True,
142 content_type=None, etag=None, content_encoding=None, content_disposition=None,
143 permitions=None, public=None, success=(201, 409)):
144 r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
145 json=json, etag=etag, content_encoding=content_encoding,
146 content_disposition=content_disposition, permitions=permitions, public=public,
148 if r.status_code == 201:
152 def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
156 hash_gen = hash_cb(nblocks)
159 for i in range(nblocks):
160 block = fileobj.read(min(blocksize, size - offset))
162 hash = pithos_hash(block, blockhash)
164 hmap[hash] = (offset, bytes)
168 assert offset == size
170 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
171 """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
174 upload_gen = upload_cb(len(missing))
179 offset, bytes = hmap[hash]
181 data = fileobj.read(bytes)
182 r = self.put_block_async(data, hash)
190 flying = [r for r in flying if not r.ready()]
194 except StopIteration:
196 gevent.joinall(flying)
198 failures = [r for r in flying if r.exception]
200 details = ', '.join(['%s.%s'%(i,r) for i,r in enumerate(failures)])
201 raise ClientError(message="Block uploading failed", status=505, details=details)
203 def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
204 content_encoding=None, content_disposition=None, content_type=None, sharing=None,
206 self.assert_container()
209 block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
210 (hashes, hmap, offset) = ([], {}, 0)
211 content_type = 'application/octet-stream' if content_type is None else content_type
213 self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
216 hashmap = dict(bytes=size, hashes=hashes)
217 missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
218 etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
219 permitions=sharing, public=public)
223 self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
225 self.object_put(obj, format='json', hashmap=True, content_type=content_type,
226 json=hashmap, success=201)
228 #download_* auxiliary methods
230 def _get_remote_blocks_info(self, obj, **restargs):
231 #retrieve object hashmap
232 myrange = restargs.pop('data_range') if 'data_range' in restargs.keys() else None
233 hashmap = self.get_object_hashmap(obj, **restargs)
234 restargs['data_range'] = myrange
235 blocksize = int(hashmap['block_size'])
236 blockhash = hashmap['block_hash']
237 total_size = hashmap['bytes']
238 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
240 for i, h in enumerate(hashmap['hashes']):
242 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
244 def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs):
245 for blockid, blockhash in enumerate(remote_hashes):
246 if blockhash == None:
248 start = blocksize*blockid
249 end = total_size-1 if start+blocksize > total_size else start+blocksize-1
250 (start, end) = _range_up(start, end, range)
251 restargs['data_range'] = 'bytes=%s-%s'%(start, end)
252 r = self.object_get(obj, success=(200, 206), **restargs)
257 def _get_block_async(self, obj, **restargs):
258 class SilentGreenlet(gevent.Greenlet):
259 def _report_error(self, exc_info):
261 sys.stderr = StringIO()
262 gevent.Greenlet._report_error(self, exc_info)
264 if hasattr(sys, '_stderr'):
265 sys.stderr = sys._stderr
266 if not hasattr(self, 'POOL_SIZE'):
268 if self.async_pool is None:
269 self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
270 g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
271 self.async_pool.start(g)
274 def _hash_from_file(self, fp, start, size, blockhash):
276 block = fp.read(size)
277 h = newhashlib(blockhash)
278 h.update(block.strip('\x00'))
279 return hexlify(h.digest())
281 def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs):
282 """write the results of a greenleted rest call to a file
283 @offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all
284 blocks will be written to normal_position - 10"""
286 for start, g in flying_greenlets.items():
290 block = g.value.content
291 local_file.seek(start - offset)
292 local_file.write(block)
294 finished.append(flying_greenlets.pop(start))
298 def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file,
299 blockhash=None, resume=False, filerange = None, **restargs):
301 file_size = fstat(local_file.fileno()).st_size if resume else 0
302 flying_greenlets = {}
303 finished_greenlets = []
305 if filerange is not None:
306 rstart = int(filerange.split('-')[0])
307 offset = rstart if blocksize > rstart else rstart%blocksize
308 for block_hash, blockid in remote_hashes.items():
309 start = blocksize*blockid
310 if start < file_size and block_hash == self._hash_from_file(local_file,
311 start, blocksize, blockhash):
314 if len(flying_greenlets) >= self.POOL_SIZE:
315 finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
317 end = total_size-1 if start+blocksize > total_size else start+blocksize-1
318 (start, end) = _range_up(start, end, filerange)
322 restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
323 flying_greenlets[start] = self._get_block_async(obj, **restargs)
326 while len(flying_greenlets) > 0:
328 finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
331 gevent.joinall(finished_greenlets)
333 def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
334 range=None, if_match=None, if_none_match=None, if_modified_since=None,
335 if_unmodified_since=None):
337 restargs=dict(version=version,
338 data_range = None if range is None else 'bytes=%s'%range,
340 if_none_match=if_none_match,
341 if_modified_since=if_modified_since,
342 if_unmodified_since=if_unmodified_since)
348 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
349 assert total_size >= 0
353 self.progress_bar_gen = download_cb(len(remote_hashes))
357 self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs)
359 self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash,
360 resume, range, **restargs)
362 dst.truncate(total_size)
366 #Command Progress Bar method
368 if hasattr(self, 'progress_bar_gen'):
370 self.progress_bar_gen.next()
373 def _complete_cb(self):
376 self.progress_bar_gen.next()
380 #Untested - except is download_object is tested first
381 def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
382 if_modified_since=None, if_unmodified_since=None, data_range=None):
384 r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
385 if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
386 if_unmodified_since=if_unmodified_since, data_range=data_range)
387 except ClientError as err:
388 if err.status == 304 or err.status == 412:
393 def set_account_group(self, group, usernames):
394 self.account_post(update=True, groups = {group:usernames})
396 def del_account_group(self, group):
397 self.account_post(update=True, groups={group:[]})
399 def get_account_info(self, until=None):
400 r = self.account_head(until=until)
401 if r.status_code == 401:
402 raise ClientError("No authorization")
405 def get_account_quota(self):
406 return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
408 def get_account_versioning(self):
409 return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
411 def get_account_meta(self, until=None):
412 return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
414 def get_account_group(self):
415 return filter_in(self.get_account_info(), 'X-Account-Group-')
417 def set_account_meta(self, metapairs):
418 assert(type(metapairs) is dict)
419 self.account_post(update=True, metadata=metapairs)
421 def del_account_meta(self, metakey):
422 self.account_post(update=True, metadata={metakey:''})
424 def set_account_quota(self, quota):
425 self.account_post(update=True, quota=quota)
427 def set_account_versioning(self, versioning):
428 self.account_post(update=True, versioning = versioning)
430 def list_containers(self):
431 r = self.account_get()
434 def del_container(self, until=None, delimiter=None):
435 self.assert_container()
436 r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
437 if r.status_code == 404:
438 raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
439 elif r.status_code == 409:
440 raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
442 def get_container_versioning(self, container):
443 self.container = container
444 return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
446 def get_container_quota(self, container):
447 self.container = container
448 return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
450 def get_container_info(self, until = None):
451 r = self.container_head(until=until)
454 def get_container_meta(self, until = None):
455 return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
457 def get_container_object_meta(self, until = None):
458 return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
460 def set_container_meta(self, metapairs):
461 assert(type(metapairs) is dict)
462 self.container_post(update=True, metadata=metapairs)
464 def del_container_meta(self, metakey):
465 self.container_post(update=True, metadata={metakey:''})
467 def set_container_quota(self, quota):
468 self.container_post(update=True, quota=quota)
470 def set_container_versioning(self, versioning):
471 self.container_post(update=True, versioning=versioning)
473 def del_object(self, obj, until=None, delimiter=None):
474 self.assert_container()
475 self.object_delete(obj, until=until, delimiter=delimiter)
477 def set_object_meta(self, object, metapairs):
478 assert(type(metapairs) is dict)
479 self.object_post(object, update=True, metadata=metapairs)
481 def del_object_meta(self, metakey, object):
482 self.object_post(object, update=True, metadata={metakey:''})
484 def publish_object(self, object):
485 self.object_post(object, update=True, public=True)
487 def unpublish_object(self, object):
488 self.object_post(object, update=True, public=False)
490 def get_object_info(self, obj, version=None):
491 r = self.object_head(obj, version=version)
494 def get_object_meta(self, obj, version=None):
495 return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
497 def get_object_sharing(self, object):
498 r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
501 perms = r['x-object-sharing'].split(';')
506 raise ClientError('Incorrect reply format')
507 (key, val) = perm.strip().split('=')
511 def set_object_sharing(self, object, read_permition = False, write_permition = False):
512 """Give read/write permisions to an object.
513 @param object is the object to change sharing permitions onto
514 @param read_permition is a list of users and user groups that get read permition for this object
515 False means all previous read permitions will be removed
516 @param write_perimition is a list of users and user groups to get write permition for this object
517 False means all previous read permitions will be removed
520 perms['read'] = read_permition if isinstance(read_permition, list) else ''
521 perms['write'] = write_permition if isinstance(write_permition, list) else ''
522 self.object_post(object, update=True, permitions=perms)
524 def del_object_sharing(self, object):
525 self.set_object_sharing(object)
527 def append_object(self, object, source_file, upload_cb = None):
528 """@param upload_db is a generator for showing progress of upload
529 to caller application, e.g. a progress bar. Its next is called
530 whenever a block is uploaded
532 self.assert_container()
533 meta = self.get_container_info()
534 blocksize = int(meta['x-container-block-size'])
535 filesize = fstat(source_file.fileno()).st_size
536 nblocks = 1 + (filesize - 1)//blocksize
538 if upload_cb is not None:
539 upload_gen = upload_cb(nblocks)
540 for i in range(nblocks):
541 block = source_file.read(min(blocksize, filesize - offset))
543 self.object_post(object, update=True, content_range='bytes */*',
544 content_type='application/octet-stream', content_length=len(block), data=block)
546 if upload_cb is not None:
549 def truncate_object(self, object, upto_bytes):
550 self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
551 content_type='application/octet-stream', object_bytes=upto_bytes,
552 source_object=path4url(self.container, object))
554 def overwrite_object(self, object, start, end, source_file, upload_cb=None):
555 """Overwrite a part of an object with given source file
556 @start the part of the remote object to start overwriting from, in bytes
557 @end the part of the remote object to stop overwriting to, in bytes
559 self.assert_container()
560 meta = self.get_container_info()
561 blocksize = int(meta['x-container-block-size'])
562 filesize = fstat(source_file.fileno()).st_size
563 datasize = int(end) - int(start) + 1
564 nblocks = 1 + (datasize - 1)//blocksize
566 if upload_cb is not None:
567 upload_gen = upload_cb(nblocks)
568 for i in range(nblocks):
569 block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
571 self.object_post(object, update=True, content_type='application/octet-stream',
572 content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
574 if upload_cb is not None:
577 def copy_object(self, src_container, src_object, dst_container, dst_object=False,
578 source_version = None, public=False, content_type=None, delimiter=None):
579 self.assert_account()
580 self.container = dst_container
581 dst_object = dst_object or src_object
582 src_path = path4url(src_container, src_object)
583 self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
584 source_version=source_version, public=public, content_type=content_type,
587 def move_object(self, src_container, src_object, dst_container, dst_object=False,
588 source_version = None, public=False, content_type=None, delimiter=None):
589 self.assert_account()
590 self.container = dst_container
591 dst_object = dst_object or src_object
592 src_path = path4url(src_container, src_object)
593 self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
594 source_version=source_version, public=public, content_type=content_type,
597 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
598 """Get accounts that share with self.account"""
599 self.assert_account()
601 self.set_param('format','json')
602 self.set_param('limit',limit, iff = limit is not None)
603 self.set_param('marker',marker, iff = marker is not None)
606 success = kwargs.pop('success', (200, 204))
607 r = self.get(path, *args, success = success, **kwargs)
610 def get_object_versionlist(self, path):
611 self.assert_container()
612 r = self.object_get(path, format='json', version='list')
613 return r.json['versions']