1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 # Monkey-patch everything for gevent early on
37 #gevent.monkey.patch_all()
40 from os import fstat, path
41 from hashlib import new as newhashlib
42 from time import time, sleep
43 from datetime import datetime
46 from binascii import hexlify
48 from kamaki.clients.pithos_rest_api import PithosRestAPI
49 from kamaki.clients.storage import ClientError
50 from kamaki.clients.utils import path4url, filter_in
51 from StringIO import StringIO
53 def pithos_hash(block, blockhash):
54 h = newhashlib(blockhash)
55 h.update(block.rstrip('\x00'))
58 def _range_up(start, end, a_range):
60 (rstart, rend) = a_range.split('-')
61 (rstart, rend) = (int(rstart), int(rend))
62 if rstart > end or rend < start:
70 class PithosClient(PithosRestAPI):
71 """GRNet Pithos API client"""
73 def __init__(self, base_url, token, account=None, container = None):
74 super(PithosClient, self).__init__(base_url, token, account = account,
75 container = container)
76 self.async_pool = None
78 def purge_container(self):
79 r = self.container_delete(until=unicode(time()))
82 def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
83 content_encoding=None, content_disposition=None, content_type=None, sharing=None,
85 # This is a naive implementation, it loads the whole file in memory
86 #Look in pithos for a nice implementation
87 self.assert_container()
93 data = json.dumps(json.loads(data))
95 raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
97 raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
99 data = f.read(size) if size is not None else f.read()
100 r = self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
101 content_disposition=content_disposition, content_type=content_type, permitions=sharing,
102 public=public, success=201)
105 #upload_* auxiliary methods
106 def put_block_async(self, data, hash):
107 class SilentGreenlet(gevent.Greenlet):
108 def _report_error(self, exc_info):
110 sys.stderr = StringIO()
111 gevent.Greenlet._report_error(self, exc_info)
113 if hasattr(sys, '_stderr'):
115 POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5
116 if self.async_pool is None:
117 self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
118 g = SilentGreenlet(self.put_block, data, hash)
119 self.async_pool.start(g)
122 def put_block(self, data, hash):
123 r = self.container_post(update=True, content_type='application/octet-stream',
124 content_length=len(data), data=data, format='json')
125 assert r.json[0] == hash, 'Local hash does not match server'
127 def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
128 content_disposition=None, content_type=None, sharing=None, public=None):
129 self.assert_container()
130 obj_content_type = 'application/octet-stream' if content_type is None else content_type
131 r = self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
132 content_disposition=content_disposition, content_type=content_type, permitions=sharing,
133 public=public, manifest='%s/%s'%(self.container,obj))
136 def _get_file_block_info(self, fileobj, size=None):
137 meta = self.get_container_info()
138 blocksize = int(meta['x-container-block-size'])
139 blockhash = meta['x-container-block-hash']
140 size = size if size is not None else fstat(fileobj.fileno()).st_size
141 nblocks = 1 + (size - 1) // blocksize
142 return (blocksize, blockhash, size, nblocks)
144 def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True,
145 content_type=None, etag=None, content_encoding=None, content_disposition=None,
146 permitions=None, public=None, success=(201, 409)):
147 r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
148 json=json, etag=etag, content_encoding=content_encoding,
149 content_disposition=content_disposition, permitions=permitions, public=public,
151 if r.status_code == 201:
156 def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
160 hash_gen = hash_cb(nblocks)
163 for i in range(nblocks):
164 block = fileobj.read(min(blocksize, size - offset))
166 hash = pithos_hash(block, blockhash)
168 hmap[hash] = (offset, bytes)
172 assert offset == size
174 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
175 """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
178 upload_gen = upload_cb(len(missing))
183 offset, bytes = hmap[hash]
185 data = fileobj.read(bytes)
186 r = self.put_block_async(data, hash)
194 flying = [r for r in flying if not r.ready()]
198 except StopIteration:
200 gevent.joinall(flying)
202 failures = [r for r in flying if r.exception]
204 details = ', '.join(['(%s).%s'%(i,r.exception) for i,r in enumerate(failures)])
205 raise ClientError(message="Block uploading failed", status=505, details=details)
207 def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
208 content_encoding=None, content_disposition=None, content_type=None, sharing=None,
210 self.assert_container()
213 block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
214 (hashes, hmap, offset) = ([], {}, 0)
215 content_type = 'application/octet-stream' if content_type is None else content_type
217 self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
220 hashmap = dict(bytes=size, hashes=hashes)
221 missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
222 etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
223 permitions=sharing, public=public)
227 self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
229 r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
230 json=hashmap, success=201)
233 #download_* auxiliary methods
235 def _get_remote_blocks_info(self, obj, **restargs):
236 #retrieve object hashmap
237 myrange = restargs.pop('data_range') if 'data_range' in restargs.keys() else None
238 hashmap = self.get_object_hashmap(obj, **restargs)
239 restargs['data_range'] = myrange
240 blocksize = int(hashmap['block_size'])
241 blockhash = hashmap['block_hash']
242 total_size = hashmap['bytes']
243 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
245 for i, h in enumerate(hashmap['hashes']):
247 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
249 def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs):
250 for blockid, blockhash in enumerate(remote_hashes):
251 if blockhash == None:
253 start = blocksize*blockid
254 end = total_size-1 if start+blocksize > total_size else start+blocksize-1
255 (start, end) = _range_up(start, end, range)
256 restargs['data_range'] = 'bytes=%s-%s'%(start, end)
257 r = self.object_get(obj, success=(200, 206), **restargs)
262 def _get_block_async(self, obj, **restargs):
263 class SilentGreenlet(gevent.Greenlet):
264 def _report_error(self, exc_info):
266 sys.stderr = StringIO()
267 gevent.Greenlet._report_error(self, exc_info)
269 if hasattr(sys, '_stderr'):
270 sys.stderr = sys._stderr
271 if not hasattr(self, 'POOL_SIZE'):
273 if self.async_pool is None:
274 self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
275 g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
276 self.async_pool.start(g)
279 def _hash_from_file(self, fp, start, size, blockhash):
281 block = fp.read(size)
282 h = newhashlib(blockhash)
283 h.update(block.strip('\x00'))
284 return hexlify(h.digest())
286 def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs):
287 """write the results of a greenleted rest call to a file
288 @offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all
289 blocks will be written to normal_position - 10"""
291 for start, g in flying_greenlets.items():
295 block = g.value.content
296 local_file.seek(start - offset)
297 local_file.write(block)
299 finished.append(flying_greenlets.pop(start))
303 def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file,
304 blockhash=None, resume=False, filerange = None, **restargs):
306 file_size = fstat(local_file.fileno()).st_size if resume else 0
307 flying_greenlets = {}
308 finished_greenlets = []
310 if filerange is not None:
311 rstart = int(filerange.split('-')[0])
312 offset = rstart if blocksize > rstart else rstart%blocksize
313 for block_hash, blockid in remote_hashes.items():
314 start = blocksize*blockid
315 if start < file_size and block_hash == self._hash_from_file(local_file,
316 start, blocksize, blockhash):
319 if len(flying_greenlets) >= self.POOL_SIZE:
320 finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
322 end = total_size-1 if start+blocksize > total_size else start+blocksize-1
323 (start, end) = _range_up(start, end, filerange)
327 restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
328 flying_greenlets[start] = self._get_block_async(obj, **restargs)
331 while len(flying_greenlets) > 0:
333 finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
336 gevent.joinall(finished_greenlets)
338 def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
339 range=None, if_match=None, if_none_match=None, if_modified_since=None,
340 if_unmodified_since=None):
342 restargs=dict(version=version,
343 data_range = None if range is None else 'bytes=%s'%range,
345 if_none_match=if_none_match,
346 if_modified_since=if_modified_since,
347 if_unmodified_since=if_unmodified_since)
353 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
354 assert total_size >= 0
358 self.progress_bar_gen = download_cb(len(remote_hashes))
362 self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs)
364 self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash,
365 resume, range, **restargs)
367 dst.truncate(total_size)
371 #Command Progress Bar method
373 if hasattr(self, 'progress_bar_gen'):
375 self.progress_bar_gen.next()
378 def _complete_cb(self):
381 self.progress_bar_gen.next()
385 #Untested - except is download_object is tested first
386 def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
387 if_modified_since=None, if_unmodified_since=None, data_range=None):
389 r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
390 if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
391 if_unmodified_since=if_unmodified_since, data_range=data_range)
392 except ClientError as err:
393 if err.status == 304 or err.status == 412:
398 def set_account_group(self, group, usernames):
399 r = self.account_post(update=True, groups = {group:usernames})
402 def del_account_group(self, group):
403 r = self.account_post(update=True, groups={group:[]})
406 def get_account_info(self, until=None):
407 r = self.account_head(until=until)
408 if r.status_code == 401:
409 raise ClientError("No authorization")
412 def get_account_quota(self):
413 return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
415 def get_account_versioning(self):
416 return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
418 def get_account_meta(self, until=None):
419 return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
421 def get_account_group(self):
422 return filter_in(self.get_account_info(), 'X-Account-Group-')
424 def set_account_meta(self, metapairs):
425 assert(type(metapairs) is dict)
426 r = self.account_post(update=True, metadata=metapairs)
429 def del_account_meta(self, metakey):
430 r = self.account_post(update=True, metadata={metakey:''})
433 def set_account_quota(self, quota):
434 r = self.account_post(update=True, quota=quota)
437 def set_account_versioning(self, versioning):
438 r = self.account_post(update=True, versioning = versioning)
441 def list_containers(self):
442 r = self.account_get()
445 def del_container(self, until=None, delimiter=None):
446 self.assert_container()
447 r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
449 if r.status_code == 404:
450 raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
451 elif r.status_code == 409:
452 raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
454 def get_container_versioning(self, container):
455 self.container = container
456 return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
458 def get_container_quota(self, container):
459 self.container = container
460 return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
462 def get_container_info(self, until = None):
463 r = self.container_head(until=until)
466 def get_container_meta(self, until = None):
467 return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
469 def get_container_object_meta(self, until = None):
470 return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
472 def set_container_meta(self, metapairs):
473 assert(type(metapairs) is dict)
474 r = self.container_post(update=True, metadata=metapairs)
477 def del_container_meta(self, metakey):
478 r = self.container_post(update=True, metadata={metakey:''})
481 def set_container_quota(self, quota):
482 r = self.container_post(update=True, quota=quota)
485 def set_container_versioning(self, versioning):
486 r = self.container_post(update=True, versioning=versioning)
489 def del_object(self, obj, until=None, delimiter=None):
490 self.assert_container()
491 r = self.object_delete(obj, until=until, delimiter=delimiter)
494 def set_object_meta(self, object, metapairs):
495 assert(type(metapairs) is dict)
496 r = self.object_post(object, update=True, metadata=metapairs)
499 def del_object_meta(self, metakey, object):
500 r = self.object_post(object, update=True, metadata={metakey:''})
503 def publish_object(self, object):
504 r = self.object_post(object, update=True, public=True)
507 def unpublish_object(self, object):
508 r = self.object_post(object, update=True, public=False)
511 def get_object_info(self, obj, version=None):
512 r = self.object_head(obj, version=version)
515 def get_object_meta(self, obj, version=None):
516 return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
518 def get_object_sharing(self, object):
519 r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
522 perms = r['x-object-sharing'].split(';')
527 raise ClientError('Incorrect reply format')
528 (key, val) = perm.strip().split('=')
532 def set_object_sharing(self, object, read_permition = False, write_permition = False):
533 """Give read/write permisions to an object.
534 @param object is the object to change sharing permitions onto
535 @param read_permition is a list of users and user groups that get read permition for this object
536 False means all previous read permitions will be removed
537 @param write_perimition is a list of users and user groups to get write permition for this object
538 False means all previous read permitions will be removed
541 perms['read'] = read_permition if isinstance(read_permition, list) else ''
542 perms['write'] = write_permition if isinstance(write_permition, list) else ''
543 r = self.object_post(object, update=True, permitions=perms)
546 def del_object_sharing(self, object):
547 self.set_object_sharing(object)
549 def append_object(self, object, source_file, upload_cb = None):
550 """@param upload_db is a generator for showing progress of upload
551 to caller application, e.g. a progress bar. Its next is called
552 whenever a block is uploaded
554 self.assert_container()
555 meta = self.get_container_info()
556 blocksize = int(meta['x-container-block-size'])
557 filesize = fstat(source_file.fileno()).st_size
558 nblocks = 1 + (filesize - 1)//blocksize
560 if upload_cb is not None:
561 upload_gen = upload_cb(nblocks)
562 for i in range(nblocks):
563 block = source_file.read(min(blocksize, filesize - offset))
565 r = self.object_post(object, update=True, content_range='bytes */*',
566 content_type='application/octet-stream', content_length=len(block), data=block)
569 if upload_cb is not None:
572 def truncate_object(self, object, upto_bytes):
573 r = self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
574 content_type='application/octet-stream', object_bytes=upto_bytes,
575 source_object=path4url(self.container, object))
578 def overwrite_object(self, object, start, end, source_file, upload_cb=None):
579 """Overwrite a part of an object with given source file
580 @start the part of the remote object to start overwriting from, in bytes
581 @end the part of the remote object to stop overwriting to, in bytes
583 self.assert_container()
584 meta = self.get_container_info()
585 blocksize = int(meta['x-container-block-size'])
586 filesize = fstat(source_file.fileno()).st_size
587 datasize = int(end) - int(start) + 1
588 nblocks = 1 + (datasize - 1)//blocksize
590 if upload_cb is not None:
591 upload_gen = upload_cb(nblocks)
592 for i in range(nblocks):
593 block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
595 r = self.object_post(object, update=True, content_type='application/octet-stream',
596 content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
599 if upload_cb is not None:
602 def copy_object(self, src_container, src_object, dst_container, dst_object=False,
603 source_version = None, public=False, content_type=None, delimiter=None):
604 self.assert_account()
605 self.container = dst_container
606 dst_object = dst_object or src_object
607 src_path = path4url(src_container, src_object)
608 r = self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
609 source_version=source_version, public=public, content_type=content_type,
613 def move_object(self, src_container, src_object, dst_container, dst_object=False,
614 source_version = None, public=False, content_type=None, delimiter=None):
615 self.assert_account()
616 self.container = dst_container
617 dst_object = dst_object or src_object
618 src_path = path4url(src_container, src_object)
619 r = self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
620 source_version=source_version, public=public, content_type=content_type,
624 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
625 """Get accounts that share with self.account"""
626 self.assert_account()
628 self.set_param('format','json')
629 self.set_param('limit',limit, iff = limit is not None)
630 self.set_param('marker',marker, iff = marker is not None)
633 success = kwargs.pop('success', (200, 204))
634 r = self.get(path, *args, success = success, **kwargs)
637 def get_object_versionlist(self, path):
638 self.assert_container()
639 r = self.object_get(path, format='json', version='list')
640 return r.json['versions']