offset += bytes
if hash_cb:
hash_gen.next()
-
assert offset == size
- path = '/%s/%s/%s' % (self.account, self.container, object)
- params = dict(format='json', hashmap='')
+ def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
+ """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
+ """
+ if upload_cb:
+ upload_gen = upload_cb(len(missing))
+ upload_gen.next()
+
+ flying = []
+ for hash in missing:
+ offset, bytes = hmap[hash]
+ fileobj.seek(offset)
+ data = fileobj.read(bytes)
+ r = self.put_block_async(data, hash)
+ flying.append(r)
+ for r in flying:
+ if r.ready():
+ if r.exception:
+ raise r.exception
+ if upload_cb:
+ upload_gen.next()
+ flying = [r for r in flying if not r.ready()]
+ while upload_cb:
+ try:
+ upload_gen.next()
+ except StopIteration:
+ break
+ gevent.joinall(flying)
+
+ failures = [r for r in flying if r.exception]
+ if len(failures):
+ details = ', '.join(['(%s).%s'%(i,r.exception) for i,r in enumerate(failures)])
+ raise ClientError(message="Block uploading failed", status=505, details=details)
+
+ def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
+ content_encoding=None, content_disposition=None, content_type=None, sharing=None,
+ public=None):
+ self.assert_container()
+
+ #init
+ block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
+ (hashes, hmap, offset) = ([], {}, 0)
+ content_type = 'application/octet-stream' if content_type is None else content_type
+
+ self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
+ hash_cb=hash_cb)
+
hashmap = dict(bytes=size, hashes=hashes)
- headers = {'Content-Type': 'application/octet-stream'}
- r = self.put(path, params=params, headers=headers, json=hashmap,
- success=(201, 409))
+ missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
+ etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
+ permitions=sharing, public=public)
- if r.status_code == 201:
+ if missing is None:
return
+ self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
- missing = r.json
+ r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
+ json=hashmap, success=201)
+ r.release()
+
+ #download_* auxiliary methods
+ #ALl untested
+ def _get_remote_blocks_info(self, obj, **restargs):
+ #retrieve object hashmap
+ myrange = restargs.pop('data_range') if 'data_range' in restargs.keys() else None
+ hashmap = self.get_object_hashmap(obj, **restargs)
+ restargs['data_range'] = myrange
+ blocksize = int(hashmap['block_size'])
+ blockhash = hashmap['block_hash']
+ total_size = hashmap['bytes']
+ #assert total_size/blocksize + 1 == len(hashmap['hashes'])
+ map_dict = {}
+ for i, h in enumerate(hashmap['hashes']):
+ map_dict[h] = i
+ return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
- if upload_cb:
- upload_gen = upload_cb(len(missing))
- upload_gen.next()
+ def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, range, **restargs):
+ for blockid, blockhash in enumerate(remote_hashes):
+ if blockhash == None:
+ continue
+ start = blocksize*blockid
+ end = total_size-1 if start+blocksize > total_size else start+blocksize-1
+ (start, end) = _range_up(start, end, range)
+ restargs['data_range'] = 'bytes=%s-%s'%(start, end)
+ r = self.object_get(obj, success=(200, 206), **restargs)
+ self._cb_next()
+ dst.write(r.content)
+ dst.flush()
- for hash in missing:
- offset, bytes = map[hash]
- f.seek(offset)
- data = f.read(bytes)
- self.put_block(data, hash)
- if upload_cb:
+ def _get_block_async(self, obj, **restargs):
+ class SilentGreenlet(gevent.Greenlet):
+ def _report_error(self, exc_info):
+ try:
+ sys.stderr = StringIO()
+ gevent.Greenlet._report_error(self, exc_info)
+ finally:
+ if hasattr(sys, '_stderr'):
+ sys.stderr = sys._stderr
+ if not hasattr(self, 'POOL_SIZE'):
+ self.POOL_SIZE = 5
+ if self.async_pool is None:
+ self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
+ g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
+ self.async_pool.start(g)
+ return g
+
+ def _hash_from_file(self, fp, start, size, blockhash):
+ fp.seek(start)
+ block = fp.read(size)
+ h = newhashlib(blockhash)
+ h.update(block.strip('\x00'))
+ return hexlify(h.digest())
+
+ def _greenlet2file(self, flying_greenlets, local_file, offset = 0, **restargs):
+ """write the results of a greenleted rest call to a file
+ @offset: the offset of the file up to blocksize - e.g. if the range is 10-100, all
+ blocks will be written to normal_position - 10"""
+ finished = []
+ for start, g in flying_greenlets.items():
+ if g.ready():
+ if g.exception:
+ raise g.exception
+ block = g.value.content
+ local_file.seek(start - offset)
+ local_file.write(block)
+ self._cb_next()
+ finished.append(flying_greenlets.pop(start))
+ local_file.flush()
+ return finished
+
+ def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file,
+ blockhash=None, resume=False, filerange = None, **restargs):
+
+ file_size = fstat(local_file.fileno()).st_size if resume else 0
+ flying_greenlets = {}
+ finished_greenlets = []
+ offset = 0
+ if filerange is not None:
+ rstart = int(filerange.split('-')[0])
+ offset = rstart if blocksize > rstart else rstart%blocksize
+ for block_hash, blockid in remote_hashes.items():
+ start = blocksize*blockid
+ if start < file_size and block_hash == self._hash_from_file(local_file,
+ start, blocksize, blockhash):
+ self._cb_next()
+ continue
+ if len(flying_greenlets) >= self.POOL_SIZE:
+ finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
+ **restargs)
+ end = total_size-1 if start+blocksize > total_size else start+blocksize-1
+ (start, end) = _range_up(start, end, filerange)
+ if start == end:
+ self._cb_next()
+ continue
+ restargs['async_headers'] = dict(Range='bytes=%s-%s'%(start, end))
+ flying_greenlets[start] = self._get_block_async(obj, **restargs)
+
+ #check the greenlets
+ while len(flying_greenlets) > 0:
+ sleep(0.001)
+ finished_greenlets += self._greenlet2file(flying_greenlets, local_file, offset,
+ **restargs)
+
+ gevent.joinall(finished_greenlets)
+
+ def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
+ range=None, if_match=None, if_none_match=None, if_modified_since=None,
+ if_unmodified_since=None):
+
+ restargs=dict(version=version,
+ data_range = None if range is None else 'bytes=%s'%range,
+ if_match=if_match,
+ if_none_match=if_none_match,
+ if_modified_since=if_modified_since,
+ if_unmodified_since=if_unmodified_since)
+
+ ( blocksize,
+ blockhash,
+ total_size,
+ hash_list,
+ remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+ assert total_size >= 0
+ self.POOL_SIZE = 5
+
+ if download_cb:
+ self.progress_bar_gen = download_cb(len(remote_hashes))
+ self._cb_next()
+
+ if dst.isatty():
+ self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, range, **restargs)
+ else:
+ self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, blockhash,
+ resume, range, **restargs)
+ if range is None:
+ dst.truncate(total_size)
+
+ self._complete_cb()
+
+ #Command Progress Bar method
+ def _cb_next(self):
+ if hasattr(self, 'progress_bar_gen'):
+ try:
+ self.progress_bar_gen.next()
+ except:
+ pass
+ def _complete_cb(self):
+ while True:
+ try:
+ self.progress_bar_gen.next()
+ except:
+ break
+
+ #Untested - except is download_object is tested first
+ def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
+ if_modified_since=None, if_unmodified_since=None, data_range=None):
+ try:
+ r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
+ if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
+ if_unmodified_since=if_unmodified_since, data_range=data_range)
+ except ClientError as err:
+ if err.status == 304 or err.status == 412:
+ return {}
+ raise
+ return r.json
+
+ def set_account_group(self, group, usernames):
+ r = self.account_post(update=True, groups = {group:usernames})
+ r.release()
+
+ def del_account_group(self, group):
+ r = self.account_post(update=True, groups={group:[]})
+ r.release()
+
+ def get_account_info(self, until=None):
+ r = self.account_head(until=until)
+ if r.status_code == 401:
+ raise ClientError("No authorization")
+ return r.headers
+
+ def get_account_quota(self):
+ return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
+
+ def get_account_versioning(self):
+ return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
+
+ def get_account_meta(self, until=None):
+ return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
+
+ def get_account_group(self):
+ return filter_in(self.get_account_info(), 'X-Account-Group-')
+
+ def set_account_meta(self, metapairs):
+ assert(type(metapairs) is dict)
+ r = self.account_post(update=True, metadata=metapairs)
+ r.release()
+
+ def del_account_meta(self, metakey):
+ r = self.account_post(update=True, metadata={metakey:''})
+ r.release()
+
+ def set_account_quota(self, quota):
+ r = self.account_post(update=True, quota=quota)
+ r.release()
+
+ def set_account_versioning(self, versioning):
+ r = self.account_post(update=True, versioning = versioning)
+ r.release()
+
+ def list_containers(self):
+ r = self.account_get()
+ return r.json
+
+ def del_container(self, until=None, delimiter=None):
+ self.assert_container()
+ r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
+ r.release()
+ if r.status_code == 404:
+ raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
+ elif r.status_code == 409:
+ raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
+
+ def get_container_versioning(self, container):
+ self.container = container
+ return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
+
+ def get_container_quota(self, container):
+ self.container = container
+ return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
+
+ def get_container_info(self, until = None):
+ r = self.container_head(until=until)
+ return r.headers
+
+ def get_container_meta(self, until = None):
+ return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
+
+ def get_container_object_meta(self, until = None):
+ return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
+
+ def set_container_meta(self, metapairs):
+ assert(type(metapairs) is dict)
+ r = self.container_post(update=True, metadata=metapairs)
+ r.release()
+
+ def del_container_meta(self, metakey):
+ r = self.container_post(update=True, metadata={metakey:''})
+ r.release()
+
+ def set_container_quota(self, quota):
+ r = self.container_post(update=True, quota=quota)
+ r.release()
+
+ def set_container_versioning(self, versioning):
+ r = self.container_post(update=True, versioning=versioning)
+ r.release()
+
+ def del_object(self, obj, until=None, delimiter=None):
+ self.assert_container()
+ r = self.object_delete(obj, until=until, delimiter=delimiter)
+ r.release()
+
+ def set_object_meta(self, object, metapairs):
+ assert(type(metapairs) is dict)
+ r = self.object_post(object, update=True, metadata=metapairs)
+ r.release()
+
+ def del_object_meta(self, metakey, object):
+ r = self.object_post(object, update=True, metadata={metakey:''})
+ r.release()
+
+ def publish_object(self, object):
+ r = self.object_post(object, update=True, public=True)
+ r.release()
+
+ def unpublish_object(self, object):
+ r = self.object_post(object, update=True, public=False)
+ r.release()
+
+ def get_object_info(self, obj, version=None):
+ r = self.object_head(obj, version=version)
+ return r.headers
+
+ def get_object_meta(self, obj, version=None):
+ return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
+
+ def get_object_sharing(self, object):
+ r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
+ reply = {}
+ if len(r) > 0:
+ perms = r['x-object-sharing'].split(';')
+ for perm in perms:
+ try:
+ perm.index('=')
+ except ValueError:
+ raise ClientError('Incorrect reply format')
+ (key, val) = perm.strip().split('=')
+ reply[key] = val
+ return reply
+
+ def set_object_sharing(self, object, read_permition = False, write_permition = False):
+ """Give read/write permisions to an object.
+ @param object is the object to change sharing permitions onto
+ @param read_permition is a list of users and user groups that get read permition for this object
+ False means all previous read permitions will be removed
+ @param write_perimition is a list of users and user groups to get write permition for this object
+ False means all previous read permitions will be removed
+ """
+ perms = {}
+ perms['read'] = read_permition if isinstance(read_permition, list) else ''
+ perms['write'] = write_permition if isinstance(write_permition, list) else ''
+ r = self.object_post(object, update=True, permitions=perms)
+ r.release()
+
+ def del_object_sharing(self, object):
+ self.set_object_sharing(object)
+
+ def append_object(self, object, source_file, upload_cb = None):
+ """@param upload_db is a generator for showing progress of upload
+ to caller application, e.g. a progress bar. Its next is called
+ whenever a block is uploaded
+ """
+ self.assert_container()
+ meta = self.get_container_info()
+ blocksize = int(meta['x-container-block-size'])
+ filesize = fstat(source_file.fileno()).st_size
+ nblocks = 1 + (filesize - 1)//blocksize
+ offset = 0
+ if upload_cb is not None:
+ upload_gen = upload_cb(nblocks)
+ for i in range(nblocks):
+ block = source_file.read(min(blocksize, filesize - offset))
+ offset += len(block)
+ r = self.object_post(object, update=True, content_range='bytes */*',
+ content_type='application/octet-stream', content_length=len(block), data=block)
+ r.release()
+
+ if upload_cb is not None:
+ upload_gen.next()
+
+ def truncate_object(self, object, upto_bytes):
+ r = self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
+ content_type='application/octet-stream', object_bytes=upto_bytes,
+ source_object=path4url(self.container, object))
+ r.release()
+
+ def overwrite_object(self, object, start, end, source_file, upload_cb=None):
+ """Overwrite a part of an object with given source file
+ @start the part of the remote object to start overwriting from, in bytes
+ @end the part of the remote object to stop overwriting to, in bytes
+ """
+ self.assert_container()
+ meta = self.get_container_info()
+ blocksize = int(meta['x-container-block-size'])
+ filesize = fstat(source_file.fileno()).st_size
+ datasize = int(end) - int(start) + 1
+ nblocks = 1 + (datasize - 1)//blocksize
+ offset = 0
+ if upload_cb is not None:
+ upload_gen = upload_cb(nblocks)
+ for i in range(nblocks):
+ block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
+ offset += len(block)
+ r = self.object_post(object, update=True, content_type='application/octet-stream',
+ content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
+ r.release()
+
+ if upload_cb is not None:
upload_gen.next()
- self.put(path, params=params, headers=headers, json=hashmap,
- success=201)
+ def copy_object(self, src_container, src_object, dst_container, dst_object=False,
+ source_version = None, public=False, content_type=None, delimiter=None):
+ self.assert_account()
+ self.container = dst_container
+ dst_object = dst_object or src_object
+ src_path = path4url(src_container, src_object)
+ r = self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
+ source_version=source_version, public=public, content_type=content_type,
+ delimiter=delimiter)
+ r.release()
+
+ def move_object(self, src_container, src_object, dst_container, dst_object=False,
+ source_version = None, public=False, content_type=None, delimiter=None):
+ self.assert_account()
+ self.container = dst_container
+ dst_object = dst_object or src_object
+ src_path = path4url(src_container, src_object)
+ r = self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
+ source_version=source_version, public=public, content_type=content_type,
+ delimiter=delimiter)
+ r.release()
+
+ def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
+ """Get accounts that share with self.account"""
+ self.assert_account()
+
+ self.set_param('format','json')
+ self.set_param('limit',limit, iff = limit is not None)
+ self.set_param('marker',marker, iff = marker is not None)
+
+ path = ''
+ success = kwargs.pop('success', (200, 204))
+ r = self.get(path, *args, success = success, **kwargs)
+ return r.json
+
+ def get_object_versionlist(self, path):
+ self.assert_container()
+ r = self.object_get(path, format='json', version='list')
+ return r.json['versions']
++