X-Git-Url: https://code.grnet.gr/git/kamaki/blobdiff_plain/524d9cdd526d855065d82a7660790cc80b134869..feature-input-output-encoding:/kamaki/clients/pithos/__init__.py diff --git a/kamaki/clients/pithos/__init__.py b/kamaki/clients/pithos/__init__.py index 5d6dbe7..4eb7153 100644 --- a/kamaki/clients/pithos/__init__.py +++ b/kamaki/clients/pithos/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2011-2013 GRNET S.A. All rights reserved. +# Copyright 2011-2014 GRNET S.A. All rights reserved. # # Redistribution and use in source and binary forms, with or # without modification, are permitted provided that the following @@ -36,14 +36,14 @@ from threading import enumerate as activethreads from os import fstat from hashlib import new as newhashlib from time import time +from StringIO import StringIO from binascii import hexlify from kamaki.clients import SilentEvent, sendlog from kamaki.clients.pithos.rest_api import PithosRestClient from kamaki.clients.storage import ClientError -from kamaki.clients.utils import path4url, filter_in -from StringIO import StringIO +from kamaki.clients.utils import path4url, filter_in, readall def _pithos_hash(block, blockhash): @@ -52,35 +52,95 @@ def _pithos_hash(block, blockhash): return h.hexdigest() -def _range_up(start, end, a_range): - if a_range: - (rstart, rend) = a_range.split('-') - (rstart, rend) = (int(rstart), int(rend)) - if rstart > end or rend < start: - return (0, 0) - if rstart > start: - start = rstart - if rend < end: - end = rend - return (start, end) +def _range_up(start, end, max_value, a_range): + """ + :param start: (int) the window bottom + + :param end: (int) the window top + + :param max_value: (int) maximum accepted value + + :param a_range: (str) a range string in the form X[,X'[,X''[...]]] + where X: x|x-y|-x where x < y and x, y natural numbers + + :returns: (str) a range string cut-off for the start-end range + an empty response means this window is out of range + """ + assert start >= 0, '_range_up called w. start(%s) < 0' % start + assert end >= start, '_range_up called w. end(%s) < start(%s)' % ( + end, start) + assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % ( + max_value, end) + if not a_range: + return '%s-%s' % (start, end) + selected = [] + for some_range in a_range.split(','): + v0, sep, v1 = some_range.partition('-') + if v0: + v0 = int(v0) + if sep: + v1 = int(v1) + if v1 < start or v0 > end or v1 < v0: + continue + v0 = v0 if v0 > start else start + v1 = v1 if v1 < end else end + selected.append('%s-%s' % (v0, v1)) + elif v0 < start: + continue + else: + v1 = v0 if v0 <= end else end + selected.append('%s-%s' % (start, v1)) + else: + v1 = int(v1) + if max_value - v1 > end: + continue + v0 = (max_value - v1) if max_value - v1 > start else start + selected.append('%s-%s' % (v0, end)) + return ','.join(selected) class PithosClient(PithosRestClient): - """GRNet Pithos API client""" + """Synnefo Pithos+ API client""" def __init__(self, base_url, token, account=None, container=None): super(PithosClient, self).__init__(base_url, token, account, container) - def purge_container(self, container=None): - """Delete an empty container and destroy associated blocks + def create_container( + self, + container=None, sizelimit=None, versioning=None, metadata=None, + **kwargs): + """ + :param container: (str) if not given, self.container is used instead + + :param sizelimit: (int) container total size limit in bytes + + :param versioning: (str) can be auto or whatever supported by server + + :param metadata: (dict) Custom user-defined metadata of the form + { 'name1': 'value1', 'name2': 'value2', ... } + + :returns: (dict) response headers """ cnt_back_up = self.container try: self.container = container or cnt_back_up - self.container_delete(until=unicode(time())) + r = self.container_put( + quota=sizelimit, versioning=versioning, metadata=metadata, + **kwargs) + return r.headers finally: self.container = cnt_back_up + def purge_container(self, container=None): + """Delete an empty container and destroy associated blocks""" + cnt_back_up = self.container + try: + self.container = container or cnt_back_up + r = self.container_delete(until=unicode(time())) + finally: + self.container = cnt_back_up + return r.headers + def upload_object_unchunked( self, obj, f, withHashFile=False, @@ -112,6 +172,8 @@ class PithosClient(PithosRestClient): 'write':[usr and/or grp names]} :param public: (bool) + + :returns: (dict) created object metadata """ self._assert_container() @@ -127,8 +189,8 @@ class PithosClient(PithosRestClient): raise ClientError(msg, 1) f = StringIO(data) else: - data = f.read(size) if size else f.read() - self.object_put( + data = readall(f, size) if size else f.read() + r = self.object_put( obj, data=data, etag=etag, @@ -138,6 +200,7 @@ class PithosClient(PithosRestClient): permissions=sharing, public=public, success=201) + return r.headers def create_object_by_manifestation( self, obj, @@ -162,9 +225,11 @@ class PithosClient(PithosRestClient): 'write':[usr and/or grp names]} :param public: (bool) + + :returns: (dict) created object metadata """ self._assert_container() - self.object_put( + r = self.object_put( obj, content_length=0, etag=etag, @@ -174,9 +239,10 @@ class PithosClient(PithosRestClient): permissions=sharing, public=public, manifest='%s/%s' % (self.container, obj)) + return r.headers # upload_* auxiliary methods - def _put_block_async(self, data, hash, upload_gen=None): + def _put_block_async(self, data, hash): event = SilentEvent(method=self._put_block, data=data, hash=hash) event.start() return event @@ -190,20 +256,37 @@ class PithosClient(PithosRestClient): format='json') assert r.json[0] == hash, 'Local hash does not match server' - def _get_file_block_info(self, fileobj, size=None): - meta = self.get_container_info() + def _get_file_block_info(self, fileobj, size=None, cache=None): + """ + :param fileobj: (file descriptor) source + + :param size: (int) size of data to upload from source + + :param cache: (dict) if provided, cache container info response to + avoid redundant calls + """ + if isinstance(cache, dict): + try: + meta = cache[self.container] + except KeyError: + meta = self.get_container_info() + cache[self.container] = meta + else: + meta = self.get_container_info() blocksize = int(meta['x-container-block-size']) blockhash = meta['x-container-block-hash'] size = size if size is not None else fstat(fileobj.fileno()).st_size nblocks = 1 + (size - 1) // blocksize return (blocksize, blockhash, size, nblocks) - def _get_missing_hashes( + def _create_object_or_get_missing_hashes( self, obj, json, size=None, format='json', hashmap=True, content_type=None, + if_etag_match=None, + if_etag_not_match=None, content_encoding=None, content_disposition=None, permissions=None, @@ -215,14 +298,16 @@ class PithosClient(PithosRestClient): hashmap=True, content_type=content_type, json=json, + if_etag_match=if_etag_match, + if_etag_not_match=if_etag_not_match, content_encoding=content_encoding, content_disposition=content_disposition, permissions=permissions, public=public, success=success) - return None if r.status_code == 201 else r.json + return (None if r.status_code == 201 else r.json), r.headers - def _culculate_blocks_for_upload( + def _calculate_blocks_for_upload( self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj, hash_cb=None): offset = 0 @@ -230,17 +315,19 @@ class PithosClient(PithosRestClient): hash_gen = hash_cb(nblocks) hash_gen.next() - for i in range(nblocks): - block = fileobj.read(min(blocksize, size - offset)) + for i in xrange(nblocks): + block = readall(fileobj, min(blocksize, size - offset)) bytes = len(block) + if bytes <= 0: + break hash = _pithos_hash(block, blockhash) hashes.append(hash) hmap[hash] = (offset, bytes) offset += bytes if hash_cb: hash_gen.next() - msg = 'Failed to calculate uploaded blocks:' - ' Offset and object size do not match' + msg = ('Failed to calculate uploading blocks: ' + 'read bytes(%s) != requested size (%s)' % (offset, size)) assert offset == size, msg def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None): @@ -253,8 +340,8 @@ class PithosClient(PithosRestClient): for hash in missing: offset, bytes = hmap[hash] fileobj.seek(offset) - data = fileobj.read(bytes) - r = self._put_block_async(data, hash, upload_gen) + data = readall(fileobj, bytes) + r = self._put_block_async(data, hash) flying.append(r) unfinished = self._watch_thread_limit(flying) for thread in set(flying).difference(unfinished): @@ -291,12 +378,14 @@ class PithosClient(PithosRestClient): hash_cb=None, upload_cb=None, etag=None, + if_etag_match=None, if_not_exist=None, content_encoding=None, content_disposition=None, content_type=None, sharing=None, - public=None): + public=None, + container_info_cache=None): """Upload an object using multiple connections (threads) :param obj: (str) remote object path @@ -309,6 +398,9 @@ class PithosClient(PithosRestClient): :param etag: (str) + :param if_etag_match: (str) Push that value to if-match header at file + creation + :param if_not_exist: (bool) If true, the file will be uploaded ONLY if it does not exist remotely, otherwise the operation will fail. Involves the case of an object with the same path is created while @@ -324,17 +416,19 @@ class PithosClient(PithosRestClient): 'write':[usr and/or grp names]} :param public: (bool) + + :param container_info_cache: (dict) if given, avoid redundant calls to + server for container info (block size and hash information) """ self._assert_container() - #init - block_info = (blocksize, blockhash, size, nblocks) =\ - self._get_file_block_info(f, size) + block_info = ( + blocksize, blockhash, size, nblocks) = self._get_file_block_info( + f, size, container_info_cache) (hashes, hmap, offset) = ([], {}, 0) - if not content_type: - content_type = 'application/octet-stream' + content_type = content_type or 'application/octet-stream' - self._culculate_blocks_for_upload( + self._calculate_blocks_for_upload( *block_info, hashes=hashes, hmap=hmap, @@ -342,17 +436,19 @@ class PithosClient(PithosRestClient): hash_cb=hash_cb) hashmap = dict(bytes=size, hashes=hashes) - missing = self._get_missing_hashes( + missing, obj_headers = self._create_object_or_get_missing_hashes( obj, hashmap, content_type=content_type, size=size, + if_etag_match=if_etag_match, + if_etag_not_match='*' if if_not_exist else None, content_encoding=content_encoding, content_disposition=content_disposition, permissions=sharing, public=public) if missing is None: - return + return obj_headers if upload_cb: upload_gen = upload_cb(len(missing)) @@ -370,10 +466,7 @@ class PithosClient(PithosRestClient): sendlog.info('%s blocks missing' % len(missing)) num_of_blocks = len(missing) missing = self._upload_missing_blocks( - missing, - hmap, - f, - upload_gen) + missing, hmap, f, upload_gen) if missing: if num_of_blocks == len(missing): retries -= 1 @@ -382,24 +475,168 @@ class PithosClient(PithosRestClient): else: break if missing: + try: + details = ['%s' % thread.exception for thread in missing] + except Exception: + details = ['Also, failed to read thread exceptions'] raise ClientError( '%s blocks failed to upload' % len(missing), - status=800) + details=details) except KeyboardInterrupt: sendlog.info('- - - wait for threads to finish') for thread in activethreads(): thread.join() raise - self.object_put( + r = self.object_put( obj, format='json', hashmap=True, content_type=content_type, + content_encoding=content_encoding, + if_etag_match=if_etag_match, if_etag_not_match='*' if if_not_exist else None, etag=etag, json=hashmap, + permissions=sharing, + public=public, success=201) + return r.headers + + def upload_from_string( + self, obj, input_str, + hash_cb=None, + upload_cb=None, + etag=None, + if_etag_match=None, + if_not_exist=None, + content_encoding=None, + content_disposition=None, + content_type=None, + sharing=None, + public=None, + container_info_cache=None): + """Upload an object using multiple connections (threads) + + :param obj: (str) remote object path + + :param input_str: (str) upload content + + :param hash_cb: optional progress.bar object for calculating hashes + + :param upload_cb: optional progress.bar object for uploading + + :param etag: (str) + + :param if_etag_match: (str) Push that value to if-match header at file + creation + + :param if_not_exist: (bool) If true, the file will be uploaded ONLY if + it does not exist remotely, otherwise the operation will fail. + Involves the case of an object with the same path is created while + the object is being uploaded. + + :param content_encoding: (str) + + :param content_disposition: (str) + + :param content_type: (str) + + :param sharing: {'read':[user and/or grp names], + 'write':[usr and/or grp names]} + + :param public: (bool) + + :param container_info_cache: (dict) if given, avoid redundant calls to + server for container info (block size and hash information) + """ + self._assert_container() + + blocksize, blockhash, size, nblocks = self._get_file_block_info( + fileobj=None, size=len(input_str), cache=container_info_cache) + (hashes, hmap, offset) = ([], {}, 0) + if not content_type: + content_type = 'application/octet-stream' + + hashes = [] + hmap = {} + for blockid in range(nblocks): + start = blockid * blocksize + block = input_str[start: (start + blocksize)] + hashes.append(_pithos_hash(block, blockhash)) + hmap[hashes[blockid]] = (start, block) + + hashmap = dict(bytes=size, hashes=hashes) + missing, obj_headers = self._create_object_or_get_missing_hashes( + obj, hashmap, + content_type=content_type, + size=size, + if_etag_match=if_etag_match, + if_etag_not_match='*' if if_not_exist else None, + content_encoding=content_encoding, + content_disposition=content_disposition, + permissions=sharing, + public=public) + if missing is None: + return obj_headers + num_of_missing = len(missing) + + if upload_cb: + self.progress_bar_gen = upload_cb(nblocks) + for i in range(nblocks + 1 - num_of_missing): + self._cb_next() + + tries = 7 + old_failures = 0 + try: + while tries and missing: + flying = [] + failures = [] + for hash in missing: + offset, block = hmap[hash] + bird = self._put_block_async(block, hash) + flying.append(bird) + unfinished = self._watch_thread_limit(flying) + for thread in set(flying).difference(unfinished): + if thread.exception: + failures.append(thread.kwargs['hash']) + if thread.isAlive(): + flying.append(thread) + else: + self._cb_next() + flying = unfinished + for thread in flying: + thread.join() + if thread.exception: + failures.append(thread.kwargs['hash']) + self._cb_next() + missing = failures + if missing and len(missing) == old_failures: + tries -= 1 + old_failures = len(missing) + if missing: + raise ClientError('%s blocks failed to upload' % len(missing)) + except KeyboardInterrupt: + sendlog.info('- - - wait for threads to finish') + for thread in activethreads(): + thread.join() + raise + self._cb_next() + + r = self.object_put( + obj, + format='json', + hashmap=True, + content_type=content_type, + content_encoding=content_encoding, + if_etag_match=if_etag_match, + if_etag_not_match='*' if if_not_exist else None, + etag=etag, + json=hashmap, + permissions=sharing, + public=public, + success=201) + return r.headers # download_* auxiliary methods def _get_remote_blocks_info(self, obj, **restargs): @@ -421,15 +658,20 @@ class PithosClient(PithosRestClient): return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict) def _dump_blocks_sync( - self, obj, remote_hashes, blocksize, total_size, dst, range, + self, obj, remote_hashes, blocksize, total_size, dst, crange, **args): + if not total_size: + return for blockid, blockhash in enumerate(remote_hashes): if blockhash: start = blocksize * blockid is_last = start + blocksize > total_size end = (total_size - 1) if is_last else (start + blocksize - 1) - (start, end) = _range_up(start, end, range) - args['data_range'] = 'bytes=%s-%s' % (start, end) + data_range = _range_up(start, end, total_size, crange) + if not data_range: + self._cb_next() + continue + args['data_range'] = 'bytes=%s' % data_range r = self.object_get(obj, success=(200, 206), **args) self._cb_next() dst.write(r.content) @@ -442,7 +684,7 @@ class PithosClient(PithosRestClient): def _hash_from_file(self, fp, start, size, blockhash): fp.seek(start) - block = fp.read(size) + block = readall(fp, size) h = newhashlib(blockhash) h.update(block.strip('\x00')) return hexlify(h.digest()) @@ -454,7 +696,7 @@ class PithosClient(PithosRestClient): - e.g. if the range is 10-100, all blocks will be written to normal_position - 10 """ - for i, (key, g) in enumerate(flying.items()): + for key, g in flying.items(): if g.isAlive(): continue if g.exception: @@ -475,9 +717,6 @@ class PithosClient(PithosRestClient): flying = dict() blockid_dict = dict() offset = 0 - if filerange is not None: - rstart = int(filerange.split('-')[0]) - offset = rstart if blocksize > rstart else rstart % blocksize self._init_thread_limit() for block_hash, blockids in remote_hashes.items(): @@ -492,14 +731,17 @@ class PithosClient(PithosRestClient): self._thread2file( flying, blockid_dict, local_file, offset, **restargs) - end = total_size - 1 if key + blocksize > total_size\ - else key + blocksize - 1 - start, end = _range_up(key, end, filerange) - if start == end: + end = total_size - 1 if ( + key + blocksize > total_size) else key + blocksize - 1 + if end < key: + self._cb_next() + continue + data_range = _range_up(key, end, total_size, filerange) + if not data_range: self._cb_next() continue - restargs['async_headers'] = { - 'Range': 'bytes=%s-%s' % (start, end)} + restargs[ + 'async_headers'] = {'Range': 'bytes=%s' % data_range} flying[key] = self._get_block_async(obj, **restargs) blockid_dict[key] = unsaved @@ -583,6 +825,86 @@ class PithosClient(PithosRestClient): self._complete_cb() + def download_to_string( + self, obj, + download_cb=None, + version=None, + range_str=None, + if_match=None, + if_none_match=None, + if_modified_since=None, + if_unmodified_since=None): + """Download an object to a string (multiple connections). This method + uses threads for http requests, but stores all content in memory. + + :param obj: (str) remote object path + + :param download_cb: optional progress.bar object for downloading + + :param version: (str) file version + + :param range_str: (str) from, to are file positions (int) in bytes + + :param if_match: (str) + + :param if_none_match: (str) + + :param if_modified_since: (str) formated date + + :param if_unmodified_since: (str) formated date + + :returns: (str) the whole object contents + """ + restargs = dict( + version=version, + data_range=None if range_str is None else 'bytes=%s' % range_str, + if_match=if_match, + if_none_match=if_none_match, + if_modified_since=if_modified_since, + if_unmodified_since=if_unmodified_since) + + ( + blocksize, + blockhash, + total_size, + hash_list, + remote_hashes) = self._get_remote_blocks_info(obj, **restargs) + assert total_size >= 0 + + if download_cb: + self.progress_bar_gen = download_cb(len(hash_list)) + self._cb_next() + + num_of_blocks = len(remote_hashes) + ret = [''] * num_of_blocks + self._init_thread_limit() + flying = dict() + try: + for blockid, blockhash in enumerate(remote_hashes): + start = blocksize * blockid + is_last = start + blocksize > total_size + end = (total_size - 1) if is_last else (start + blocksize - 1) + data_range_str = _range_up(start, end, end, range_str) + if data_range_str: + self._watch_thread_limit(flying.values()) + restargs['data_range'] = 'bytes=%s' % data_range_str + flying[blockid] = self._get_block_async(obj, **restargs) + for runid, thread in flying.items(): + if (blockid + 1) == num_of_blocks: + thread.join() + elif thread.isAlive(): + continue + if thread.exception: + raise thread.exception + ret[runid] = thread.value.content + self._cb_next() + flying.pop(runid) + return ''.join(ret) + except KeyboardInterrupt: + sendlog.info('- - - wait for threads to finish') + for thread in activethreads(): + thread.join() + #Command Progress Bar method def _cb_next(self, step=1): if hasattr(self, 'progress_bar_gen'): @@ -605,8 +927,7 @@ class PithosClient(PithosRestClient): if_match=None, if_none_match=None, if_modified_since=None, - if_unmodified_since=None, - data_range=None): + if_unmodified_since=None): """ :param obj: (str) remote object path @@ -618,9 +939,6 @@ class PithosClient(PithosRestClient): :param if_unmodified_since: (str) formated date - :param data_range: (str) from-to where from and to are integers - denoting file positions in bytes - :returns: (list) """ try: @@ -631,8 +949,7 @@ class PithosClient(PithosRestClient): if_etag_match=if_match, if_etag_not_match=if_none_match, if_modified_since=if_modified_since, - if_unmodified_since=if_unmodified_since, - data_range=data_range) + if_unmodified_since=if_unmodified_since) except ClientError as err: if err.status == 304 or err.status == 412: return {} @@ -645,7 +962,8 @@ class PithosClient(PithosRestClient): :param usernames: (list) """ - self.account_post(update=True, groups={group: usernames}) + r = self.account_post(update=True, groups={group: usernames}) + return r def del_account_group(self, group): """ @@ -673,18 +991,18 @@ class PithosClient(PithosRestClient): 'X-Account-Policy-Quota', exactMatch=True) - def get_account_versioning(self): - """ - :returns: (dict) - """ - return filter_in( - self.get_account_info(), - 'X-Account-Policy-Versioning', - exactMatch=True) + #def get_account_versioning(self): + # """ + # :returns: (dict) + # """ + # return filter_in( + # self.get_account_info(), + # 'X-Account-Policy-Versioning', + # exactMatch=True) def get_account_meta(self, until=None): """ - :meta until: (str) formated date + :param until: (str) formated date :returns: (dict) """ @@ -701,25 +1019,28 @@ class PithosClient(PithosRestClient): :param metapairs: (dict) {key1:val1, key2:val2, ...} """ assert(type(metapairs) is dict) - self.account_post(update=True, metadata=metapairs) + r = self.account_post(update=True, metadata=metapairs) + return r.headers def del_account_meta(self, metakey): """ :param metakey: (str) metadatum key """ - self.account_post(update=True, metadata={metakey: ''}) + r = self.account_post(update=True, metadata={metakey: ''}) + return r.headers - def set_account_quota(self, quota): - """ - :param quota: (int) - """ - self.account_post(update=True, quota=quota) + #def set_account_quota(self, quota): + # """ + # :param quota: (int) + # """ + # self.account_post(update=True, quota=quota) - def set_account_versioning(self, versioning): - """ - "param versioning: (str) - """ - self.account_post(update=True, versioning=versioning) + #def set_account_versioning(self, versioning): + # """ + # :param versioning: (str) + # """ + # r = self.account_post(update=True, versioning=versioning) + # return r.headers def list_containers(self): """ @@ -751,6 +1072,7 @@ class PithosClient(PithosRestClient): raise ClientError( 'Container "%s" is not empty' % self.container, r.status_code) + return r.headers def get_container_versioning(self, container=None): """ @@ -767,7 +1089,7 @@ class PithosClient(PithosRestClient): finally: self.container = cnt_back_up - def get_container_quota(self, container=None): + def get_container_limit(self, container=None): """ :param container: (str) @@ -782,7 +1104,7 @@ class PithosClient(PithosRestClient): finally: self.container = cnt_back_up - def get_container_info(self, until=None): + def get_container_info(self, container=None, until=None): """ :param until: (str) formated date @@ -790,11 +1112,16 @@ class PithosClient(PithosRestClient): :raises ClientError: 404 Container not found """ + bck_cont = self.container try: + self.container = container or bck_cont + self._assert_container() r = self.container_head(until=until) except ClientError as err: err.details.append('for container %s' % self.container) raise err + finally: + self.container = bck_cont return r.headers def get_container_meta(self, until=None): @@ -804,8 +1131,7 @@ class PithosClient(PithosRestClient): :returns: (dict) """ return filter_in( - self.get_container_info(until=until), - 'X-Container-Meta') + self.get_container_info(until=until), 'X-Container-Meta') def get_container_object_meta(self, until=None): """ @@ -814,33 +1140,38 @@ class PithosClient(PithosRestClient): :returns: (dict) """ return filter_in( - self.get_container_info(until=until), - 'X-Container-Object-Meta') + self.get_container_info(until=until), 'X-Container-Object-Meta') def set_container_meta(self, metapairs): """ :param metapairs: (dict) {key1:val1, key2:val2, ...} """ assert(type(metapairs) is dict) - self.container_post(update=True, metadata=metapairs) + r = self.container_post(update=True, metadata=metapairs) + return r.headers def del_container_meta(self, metakey): """ :param metakey: (str) metadatum key + + :returns: (dict) response headers """ - self.container_post(update=True, metadata={metakey: ''}) + r = self.container_post(update=True, metadata={metakey: ''}) + return r.headers - def set_container_quota(self, quota): + def set_container_limit(self, limit): """ - :param quota: (int) + :param limit: (int) """ - self.container_post(update=True, quota=quota) + r = self.container_post(update=True, quota=limit) + return r.headers def set_container_versioning(self, versioning): """ :param versioning: (str) """ - self.container_post(update=True, versioning=versioning) + r = self.container_post(update=True, versioning=versioning) + return r.headers def del_object(self, obj, until=None, delimiter=None): """ @@ -851,7 +1182,8 @@ class PithosClient(PithosRestClient): :param delimiter: (str) """ self._assert_container() - self.object_delete(obj, until=until, delimiter=delimiter) + r = self.object_delete(obj, until=until, delimiter=delimiter) + return r.headers def set_object_meta(self, obj, metapairs): """ @@ -860,7 +1192,8 @@ class PithosClient(PithosRestClient): :param metapairs: (dict) {key1:val1, key2:val2, ...} """ assert(type(metapairs) is dict) - self.object_post(obj, update=True, metadata=metapairs) + r = self.object_post(obj, update=True, metadata=metapairs) + return r.headers def del_object_meta(self, obj, metakey): """ @@ -868,7 +1201,8 @@ class PithosClient(PithosRestClient): :param metakey: (str) metadatum key """ - self.object_post(obj, update=True, metadata={metakey: ''}) + r = self.object_post(obj, update=True, metadata={metakey: ''}) + return r.headers def publish_object(self, obj): """ @@ -878,6 +1212,7 @@ class PithosClient(PithosRestClient): """ self.object_post(obj, update=True, public=True) info = self.get_object_info(obj) + return info['x-object-public'] pref, sep, rest = self.base_url.partition('//') base = rest.split('/')[0] return '%s%s%s/%s' % (pref, sep, base, info['x-object-public']) @@ -886,7 +1221,8 @@ class PithosClient(PithosRestClient): """ :param obj: (str) remote object path """ - self.object_post(obj, update=True, public=False) + r = self.object_post(obj, update=True, public=False) + return r.headers def get_object_info(self, obj, version=None): """ @@ -940,28 +1276,31 @@ class PithosClient(PithosRestClient): def set_object_sharing( self, obj, - read_permition=False, write_permition=False): + read_permission=False, write_permission=False): """Give read/write permisions to an object. :param obj: (str) remote object path - :param read_permition: (list - bool) users and user groups that get - read permition for this object - False means all previous read + :param read_permission: (list - bool) users and user groups that get + read permission for this object - False means all previous read permissions will be removed - :param write_perimition: (list - bool) of users and user groups to get - write permition for this object - False means all previous write + :param write_permission: (list - bool) of users and user groups to get + write permission for this object - False means all previous write permissions will be removed + + :returns: (dict) response headers """ - perms = dict(read=read_permition or '', write=write_permition or '') - self.object_post(obj, update=True, permissions=perms) + perms = dict(read=read_permission or '', write=write_permission or '') + r = self.object_post(obj, update=True, permissions=perms) + return r.headers def del_object_sharing(self, obj): """ :param obj: (str) remote object path """ - self.set_object_sharing(obj) + return self.set_object_sharing(obj) def append_object(self, obj, source_file, upload_cb=None): """ @@ -971,46 +1310,79 @@ class PithosClient(PithosRestClient): :param upload_db: progress.bar for uploading """ - self._assert_container() meta = self.get_container_info() blocksize = int(meta['x-container-block-size']) filesize = fstat(source_file.fileno()).st_size nblocks = 1 + (filesize - 1) // blocksize offset = 0 + headers = {} if upload_cb: - upload_gen = upload_cb(nblocks) - upload_gen.next() - for i in range(nblocks): - block = source_file.read(min(blocksize, filesize - offset)) - offset += len(block) - self.object_post( - obj, - update=True, - content_range='bytes */*', - content_type='application/octet-stream', - content_length=len(block), - data=block) + self.progress_bar_gen = upload_cb(nblocks) + self._cb_next() + flying = {} + self._init_thread_limit() + try: + for i in range(nblocks): + block = source_file.read(min(blocksize, filesize - offset)) + offset += len(block) - if upload_cb: - upload_gen.next() + self._watch_thread_limit(flying.values()) + unfinished = {} + flying[i] = SilentEvent( + method=self.object_post, + obj=obj, + update=True, + content_range='bytes */*', + content_type='application/octet-stream', + content_length=len(block), + data=block) + flying[i].start() + + for key, thread in flying.items(): + if thread.isAlive(): + if i < nblocks: + unfinished[key] = thread + continue + thread.join() + if thread.exception: + raise thread.exception + headers[key] = thread.value.headers + self._cb_next() + flying = unfinished + except KeyboardInterrupt: + sendlog.info('- - - wait for threads to finish') + for thread in activethreads(): + thread.join() + finally: + from time import sleep + sleep(2 * len(activethreads())) + self._cb_next() + return headers.values() def truncate_object(self, obj, upto_bytes): """ :param obj: (str) remote object path :param upto_bytes: max number of bytes to leave on file + + :returns: (dict) response headers """ - self.object_post( + ctype = self.get_object_info(obj)['content-type'] + r = self.object_post( obj, update=True, content_range='bytes 0-%s/*' % upto_bytes, - content_type='application/octet-stream', + content_type=ctype, object_bytes=upto_bytes, source_object=path4url(self.container, obj)) + return r.headers - def overwrite_object(self, obj, start, end, source_file, upload_cb=None): + def overwrite_object( + self, obj, start, end, source_file, + source_version=None, upload_cb=None): """Overwrite a part of an object from local source file + ATTENTION: content_type must always be application/octet-stream :param obj: (str) remote object path @@ -1023,30 +1395,26 @@ class PithosClient(PithosRestClient): :param upload_db: progress.bar for uploading """ - r = self.get_object_info(obj) - rf_size = int(r['content-length']) - if rf_size < int(start): - raise ClientError( - 'Range start exceeds file size', - status=416) - elif rf_size < int(end): - raise ClientError( - 'Range end exceeds file size', - status=416) self._assert_container() + r = self.get_object_info(obj, version=source_version) + rf_size = int(r['content-length']) + start, end = int(start), int(end) + assert rf_size >= start, 'Range start %s exceeds file size %s' % ( + start, rf_size) meta = self.get_container_info() blocksize = int(meta['x-container-block-size']) filesize = fstat(source_file.fileno()).st_size - datasize = int(end) - int(start) + 1 + datasize = end - start + 1 nblocks = 1 + (datasize - 1) // blocksize offset = 0 if upload_cb: - upload_gen = upload_cb(nblocks) - upload_gen.next() + self.progress_bar_gen = upload_cb(nblocks) + self._cb_next() + headers = [] for i in range(nblocks): read_size = min(blocksize, filesize - offset, datasize - offset) block = source_file.read(read_size) - self.object_post( + r = self.object_post( obj, update=True, content_type='application/octet-stream', @@ -1054,11 +1422,13 @@ class PithosClient(PithosRestClient): content_range='bytes %s-%s/*' % ( start + offset, start + offset + len(block) - 1), + source_version=source_version, data=block) + headers.append(dict(r.headers)) offset += len(block) - - if upload_cb: - upload_gen.next() + self._cb_next() + self._cb_next() + return headers def copy_object( self, src_container, src_object, dst_container, @@ -1086,11 +1456,13 @@ class PithosClient(PithosRestClient): :param content_type: (str) :param delimiter: (str) + + :returns: (dict) response headers """ self._assert_account() self.container = dst_container src_path = path4url(src_container, src_object) - self.object_put( + r = self.object_put( dst_object or src_object, success=201, copy_from=src_path, @@ -1100,6 +1472,7 @@ class PithosClient(PithosRestClient): public=public, content_type=content_type, delimiter=delimiter) + return r.headers def move_object( self, src_container, src_object, dst_container, @@ -1127,12 +1500,14 @@ class PithosClient(PithosRestClient): :param content_type: (str) :param delimiter: (str) + + :returns: (dict) response headers """ self._assert_account() self.container = dst_container dst_object = dst_object or src_object src_path = path4url(src_container, src_object) - self.object_put( + r = self.object_put( dst_object, success=201, move_from=src_path, @@ -1142,6 +1517,7 @@ class PithosClient(PithosRestClient): public=public, content_type=content_type, delimiter=delimiter) + return r.headers def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs): """Get accounts that share with self.account