from os import fstat
from hashlib import new as newhashlib
from time import time
+from StringIO import StringIO
from binascii import hexlify
from kamaki.clients.pithos.rest_api import PithosRestClient
from kamaki.clients.storage import ClientError
from kamaki.clients.utils import path4url, filter_in
-from StringIO import StringIO
def _pithos_hash(block, blockhash):
class PithosClient(PithosRestClient):
- """GRNet Pithos API client"""
+ """Synnefo Pithos+ API client"""
def __init__(self, base_url, token, account=None, container=None):
super(PithosClient, self).__init__(base_url, token, account, container)
+ def create_container(
+ self,
+ container=None, sizelimit=None, versioning=None, metadata=None):
+ """
+ :param container: (str) if not given, self.container is used instead
+
+ :param sizelimit: (int) container total size limit in bytes
+
+ :param versioning: (str) can be auto or whatever supported by server
+
+ :param metadata: (dict) Custom user-defined metadata of the form
+ { 'name1': 'value1', 'name2': 'value2', ... }
+
+ :returns: (dict) response headers
+ """
+ cnt_back_up = self.container
+ try:
+ self.container = container or cnt_back_up
+ r = self.container_put(
+ quota=sizelimit, versioning=versioning, metadata=metadata)
+ return r.headers
+ finally:
+ self.container = cnt_back_up
+
def purge_container(self, container=None):
"""Delete an empty container and destroy associated blocks
"""
cnt_back_up = self.container
try:
self.container = container or cnt_back_up
- self.container_delete(until=unicode(time()))
+ r = self.container_delete(until=unicode(time()))
finally:
self.container = cnt_back_up
+ return r.headers
def upload_object_unchunked(
self, obj, f,
'write':[usr and/or grp names]}
:param public: (bool)
+
+ :returns: (dict) created object metadata
"""
self._assert_container()
f = StringIO(data)
else:
data = f.read(size) if size else f.read()
- self.object_put(
+ r = self.object_put(
obj,
data=data,
etag=etag,
permissions=sharing,
public=public,
success=201)
+ return r.headers
def create_object_by_manifestation(
self, obj,
'write':[usr and/or grp names]}
:param public: (bool)
+
+ :returns: (dict) created object metadata
"""
self._assert_container()
- self.object_put(
+ r = self.object_put(
obj,
content_length=0,
etag=etag,
permissions=sharing,
public=public,
manifest='%s/%s' % (self.container, obj))
+ return r.headers
# upload_* auxiliary methods
- def _put_block_async(self, data, hash, upload_gen=None):
+ def _put_block_async(self, data, hash):
event = SilentEvent(method=self._put_block, data=data, hash=hash)
event.start()
return event
format='json')
assert r.json[0] == hash, 'Local hash does not match server'
- def _get_file_block_info(self, fileobj, size=None):
- meta = self.get_container_info()
+ def _get_file_block_info(self, fileobj, size=None, cache=None):
+ """
+ :param fileobj: (file descriptor) source
+
+ :param size: (int) size of data to upload from source
+
+ :param cache: (dict) if provided, cache container info response to
+ avoid redundant calls
+ """
+ if isinstance(cache, dict):
+ try:
+ meta = cache[self.container]
+ except KeyError:
+ meta = self.get_container_info()
+ cache[self.container] = meta
+ else:
+ meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
blockhash = meta['x-container-block-hash']
size = size if size is not None else fstat(fileobj.fileno()).st_size
nblocks = 1 + (size - 1) // blocksize
return (blocksize, blockhash, size, nblocks)
- def _get_missing_hashes(
+ def _create_object_or_get_missing_hashes(
self, obj, json,
size=None,
format='json',
hashmap=True,
content_type=None,
+ if_etag_match=None,
+ if_etag_not_match=None,
content_encoding=None,
content_disposition=None,
permissions=None,
hashmap=True,
content_type=content_type,
json=json,
+ if_etag_match=if_etag_match,
+ if_etag_not_match=if_etag_not_match,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=permissions,
public=public,
success=success)
- return None if r.status_code == 201 else r.json
+ return (None if r.status_code == 201 else r.json), r.headers
- def _culculate_blocks_for_upload(
+ def _calculate_blocks_for_upload(
self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
hash_cb=None):
offset = 0
offset, bytes = hmap[hash]
fileobj.seek(offset)
data = fileobj.read(bytes)
- r = self._put_block_async(data, hash, upload_gen)
+ r = self._put_block_async(data, hash)
flying.append(r)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
hash_cb=None,
upload_cb=None,
etag=None,
+ if_etag_match=None,
if_not_exist=None,
content_encoding=None,
content_disposition=None,
content_type=None,
sharing=None,
- public=None):
+ public=None,
+ container_info_cache=None):
"""Upload an object using multiple connections (threads)
:param obj: (str) remote object path
:param etag: (str)
+ :param if_etag_match: (str) Push that value to if-match header at file
+ creation
+
:param if_not_exist: (bool) If true, the file will be uploaded ONLY if
it does not exist remotely, otherwise the operation will fail.
Involves the case of an object with the same path is created while
'write':[usr and/or grp names]}
:param public: (bool)
+
+ :param container_info_cache: (dict) if given, avoid redundant calls to
+ server for container info (block size and hash information)
"""
self._assert_container()
- #init
- block_info = (blocksize, blockhash, size, nblocks) =\
- self._get_file_block_info(f, size)
+ block_info = (
+ blocksize, blockhash, size, nblocks) = self._get_file_block_info(
+ f, size, container_info_cache)
(hashes, hmap, offset) = ([], {}, 0)
if not content_type:
content_type = 'application/octet-stream'
- self._culculate_blocks_for_upload(
+ self._calculate_blocks_for_upload(
*block_info,
hashes=hashes,
hmap=hmap,
hash_cb=hash_cb)
hashmap = dict(bytes=size, hashes=hashes)
- missing = self._get_missing_hashes(
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=sharing,
public=public)
if missing is None:
- return
+ return obj_headers
if upload_cb:
upload_gen = upload_cb(len(missing))
if missing:
raise ClientError(
'%s blocks failed to upload' % len(missing),
- status=800)
+ details=['%s' % thread.exception for thread in missing])
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
raise
- self.object_put(
+ r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
+ if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
etag=etag,
json=hashmap,
permissions=sharing,
public=public,
success=201)
+ return r.headers
+
+ def upload_from_string(
+ self, obj, input_str,
+ hash_cb=None,
+ upload_cb=None,
+ etag=None,
+ if_etag_match=None,
+ if_not_exist=None,
+ content_encoding=None,
+ content_disposition=None,
+ content_type=None,
+ sharing=None,
+ public=None,
+ container_info_cache=None):
+ """Upload an object using multiple connections (threads)
+
+ :param obj: (str) remote object path
+
+ :param input_str: (str) upload content
+
+ :param hash_cb: optional progress.bar object for calculating hashes
+
+ :param upload_cb: optional progress.bar object for uploading
+
+ :param etag: (str)
+
+ :param if_etag_match: (str) Push that value to if-match header at file
+ creation
+
+ :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+ it does not exist remotely, otherwise the operation will fail.
+ Involves the case of an object with the same path is created while
+ the object is being uploaded.
+
+ :param content_encoding: (str)
+
+ :param content_disposition: (str)
+
+ :param content_type: (str)
+
+ :param sharing: {'read':[user and/or grp names],
+ 'write':[usr and/or grp names]}
+
+ :param public: (bool)
+
+ :param container_info_cache: (dict) if given, avoid redundant calls to
+ server for container info (block size and hash information)
+ """
+ self._assert_container()
+
+ blocksize, blockhash, size, nblocks = self._get_file_block_info(
+ fileobj=None, size=len(input_str), cache=container_info_cache)
+ (hashes, hmap, offset) = ([], {}, 0)
+ if not content_type:
+ content_type = 'application/octet-stream'
+
+ hashes = []
+ hmap = {}
+ for blockid in range(nblocks):
+ start = blockid * blocksize
+ block = input_str[start: (start + blocksize)]
+ hashes.append(_pithos_hash(block, blockhash))
+ hmap[hashes[blockid]] = (start, block)
+
+ hashmap = dict(bytes=size, hashes=hashes)
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
+ obj, hashmap,
+ content_type=content_type,
+ size=size,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ content_encoding=content_encoding,
+ content_disposition=content_disposition,
+ permissions=sharing,
+ public=public)
+ if missing is None:
+ return obj_headers
+ num_of_missing = len(missing)
+
+ if upload_cb:
+ self.progress_bar_gen = upload_cb(nblocks)
+ for i in range(nblocks + 1 - num_of_missing):
+ self._cb_next()
+
+ tries = 7
+ old_failures = 0
+ try:
+ while tries and missing:
+ flying = []
+ failures = []
+ for hash in missing:
+ offset, block = hmap[hash]
+ bird = self._put_block_async(block, hash)
+ flying.append(bird)
+ unfinished = self._watch_thread_limit(flying)
+ for thread in set(flying).difference(unfinished):
+ if thread.exception:
+ failures.append(thread.kwargs['hash'])
+ if thread.isAlive():
+ flying.append(thread)
+ else:
+ self._cb_next()
+ flying = unfinished
+ for thread in flying:
+ thread.join()
+ if thread.exception:
+ failures.append(thread.kwargs['hash'])
+ self._cb_next()
+ missing = failures
+ if missing and len(missing) == old_failures:
+ tries -= 1
+ old_failures = len(missing)
+ if missing:
+ raise ClientError(
+ '%s blocks failed to upload' % len(missing),
+ details=['%s' % thread.exception for thread in missing])
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+ raise
+
+ r = self.object_put(
+ obj,
+ format='json',
+ hashmap=True,
+ content_type=content_type,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ etag=etag,
+ json=hashmap,
+ permissions=sharing,
+ public=public,
+ success=201)
+ return r.headers
# download_* auxiliary methods
def _get_remote_blocks_info(self, obj, **restargs):
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
- for i, (key, g) in enumerate(flying.items()):
+ for key, g in flying.items():
if g.isAlive():
continue
if g.exception:
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
- end = total_size - 1 if key + blocksize > total_size\
- else key + blocksize - 1
+ end = total_size - 1 if (
+ key + blocksize > total_size) else key + blocksize - 1
start, end = _range_up(key, end, filerange)
if start == end:
self._cb_next()
self._complete_cb()
+ def download_to_string(
+ self, obj,
+ download_cb=None,
+ version=None,
+ range_str=None,
+ if_match=None,
+ if_none_match=None,
+ if_modified_since=None,
+ if_unmodified_since=None):
+ """Download an object to a string (multiple connections). This method
+ uses threads for http requests, but stores all content in memory.
+
+ :param obj: (str) remote object path
+
+ :param download_cb: optional progress.bar object for downloading
+
+ :param version: (str) file version
+
+ :param range_str: (str) from, to are file positions (int) in bytes
+
+ :param if_match: (str)
+
+ :param if_none_match: (str)
+
+ :param if_modified_since: (str) formated date
+
+ :param if_unmodified_since: (str) formated date
+
+ :returns: (str) the whole object contents
+ """
+ restargs = dict(
+ version=version,
+ data_range=None if range_str is None else 'bytes=%s' % range_str,
+ if_match=if_match,
+ if_none_match=if_none_match,
+ if_modified_since=if_modified_since,
+ if_unmodified_since=if_unmodified_since)
+
+ (
+ blocksize,
+ blockhash,
+ total_size,
+ hash_list,
+ remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+ assert total_size >= 0
+
+ if download_cb:
+ self.progress_bar_gen = download_cb(len(hash_list))
+ self._cb_next()
+
+ num_of_blocks = len(remote_hashes)
+ ret = [''] * num_of_blocks
+ self._init_thread_limit()
+ flying = dict()
+ try:
+ for blockid, blockhash in enumerate(remote_hashes):
+ start = blocksize * blockid
+ is_last = start + blocksize > total_size
+ end = (total_size - 1) if is_last else (start + blocksize - 1)
+ (start, end) = _range_up(start, end, range_str)
+ if start < end:
+ self._watch_thread_limit(flying.values())
+ flying[blockid] = self._get_block_async(obj, **restargs)
+ for runid, thread in flying.items():
+ if (blockid + 1) == num_of_blocks:
+ thread.join()
+ elif thread.isAlive():
+ continue
+ if thread.exception:
+ raise thread.exception
+ ret[runid] = thread.value.content
+ self._cb_next()
+ flying.pop(runid)
+ return ''.join(ret)
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+
#Command Progress Bar method
def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'):
:param usernames: (list)
"""
- self.account_post(update=True, groups={group: usernames})
+ r = self.account_post(update=True, groups={group: usernames})
+ return r
def del_account_group(self, group):
"""
'X-Account-Policy-Quota',
exactMatch=True)
- def get_account_versioning(self):
- """
- :returns: (dict)
- """
- return filter_in(
- self.get_account_info(),
- 'X-Account-Policy-Versioning',
- exactMatch=True)
+ #def get_account_versioning(self):
+ # """
+ # :returns: (dict)
+ # """
+ # return filter_in(
+ # self.get_account_info(),
+ # 'X-Account-Policy-Versioning',
+ # exactMatch=True)
def get_account_meta(self, until=None):
"""
- :meta until: (str) formated date
+ :param until: (str) formated date
:returns: (dict)
"""
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.account_post(update=True, metadata=metapairs)
+ r = self.account_post(update=True, metadata=metapairs)
+ return r.headers
def del_account_meta(self, metakey):
"""
:param metakey: (str) metadatum key
"""
- self.account_post(update=True, metadata={metakey: ''})
+ r = self.account_post(update=True, metadata={metakey: ''})
+ return r.headers
- def set_account_quota(self, quota):
- """
- :param quota: (int)
- """
- self.account_post(update=True, quota=quota)
+ #def set_account_quota(self, quota):
+ # """
+ # :param quota: (int)
+ # """
+ # self.account_post(update=True, quota=quota)
- def set_account_versioning(self, versioning):
- """
- "param versioning: (str)
- """
- self.account_post(update=True, versioning=versioning)
+ #def set_account_versioning(self, versioning):
+ # """
+ # :param versioning: (str)
+ # """
+ # r = self.account_post(update=True, versioning=versioning)
+ # return r.headers
def list_containers(self):
"""
raise ClientError(
'Container "%s" is not empty' % self.container,
r.status_code)
+ return r.headers
def get_container_versioning(self, container=None):
"""
finally:
self.container = cnt_back_up
- def get_container_quota(self, container=None):
+ def get_container_limit(self, container=None):
"""
:param container: (str)
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.container_post(update=True, metadata=metapairs)
+ r = self.container_post(update=True, metadata=metapairs)
+ return r.headers
def del_container_meta(self, metakey):
"""
:param metakey: (str) metadatum key
+
+ :returns: (dict) response headers
"""
- self.container_post(update=True, metadata={metakey: ''})
+ r = self.container_post(update=True, metadata={metakey: ''})
+ return r.headers
- def set_container_quota(self, quota):
+ def set_container_limit(self, limit):
"""
- :param quota: (int)
+ :param limit: (int)
"""
- self.container_post(update=True, quota=quota)
+ r = self.container_post(update=True, quota=limit)
+ return r.headers
def set_container_versioning(self, versioning):
"""
:param versioning: (str)
"""
- self.container_post(update=True, versioning=versioning)
+ r = self.container_post(update=True, versioning=versioning)
+ return r.headers
def del_object(self, obj, until=None, delimiter=None):
"""
:param delimiter: (str)
"""
self._assert_container()
- self.object_delete(obj, until=until, delimiter=delimiter)
+ r = self.object_delete(obj, until=until, delimiter=delimiter)
+ return r.headers
def set_object_meta(self, obj, metapairs):
"""
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.object_post(obj, update=True, metadata=metapairs)
+ r = self.object_post(obj, update=True, metadata=metapairs)
+ return r.headers
def del_object_meta(self, obj, metakey):
"""
:param metakey: (str) metadatum key
"""
- self.object_post(obj, update=True, metadata={metakey: ''})
+ r = self.object_post(obj, update=True, metadata={metakey: ''})
+ return r.headers
def publish_object(self, obj):
"""
"""
:param obj: (str) remote object path
"""
- self.object_post(obj, update=True, public=False)
+ r = self.object_post(obj, update=True, public=False)
+ return r.headers
def get_object_info(self, obj, version=None):
"""
def set_object_sharing(
self, obj,
- read_permition=False, write_permition=False):
+ read_permission=False, write_permission=False):
"""Give read/write permisions to an object.
:param obj: (str) remote object path
- :param read_permition: (list - bool) users and user groups that get
- read permition for this object - False means all previous read
+ :param read_permission: (list - bool) users and user groups that get
+ read permission for this object - False means all previous read
permissions will be removed
- :param write_perimition: (list - bool) of users and user groups to get
- write permition for this object - False means all previous write
+ :param write_permission: (list - bool) of users and user groups to get
+ write permission for this object - False means all previous write
permissions will be removed
+
+ :returns: (dict) response headers
"""
- perms = dict(read=read_permition or '', write=write_permition or '')
- self.object_post(obj, update=True, permissions=perms)
+ perms = dict(read=read_permission or '', write=write_permission or '')
+ r = self.object_post(obj, update=True, permissions=perms)
+ return r.headers
def del_object_sharing(self, obj):
"""
:param obj: (str) remote object path
"""
- self.set_object_sharing(obj)
+ return self.set_object_sharing(obj)
def append_object(self, obj, source_file, upload_cb=None):
"""
:param upload_db: progress.bar for uploading
"""
-
self._assert_container()
meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
filesize = fstat(source_file.fileno()).st_size
nblocks = 1 + (filesize - 1) // blocksize
offset = 0
+ headers = {}
if upload_cb:
- upload_gen = upload_cb(nblocks)
- upload_gen.next()
- for i in range(nblocks):
- block = source_file.read(min(blocksize, filesize - offset))
- offset += len(block)
- self.object_post(
- obj,
- update=True,
- content_range='bytes */*',
- content_type='application/octet-stream',
- content_length=len(block),
- data=block)
+ self.progress_bar_gen = upload_cb(nblocks)
+ self._cb_next()
+ flying = {}
+ self._init_thread_limit()
+ try:
+ for i in range(nblocks):
+ block = source_file.read(min(blocksize, filesize - offset))
+ offset += len(block)
- if upload_cb:
- upload_gen.next()
+ self._watch_thread_limit(flying.values())
+ unfinished = {}
+ flying[i] = SilentEvent(
+ method=self.object_post,
+ obj=obj,
+ update=True,
+ content_range='bytes */*',
+ content_type='application/octet-stream',
+ content_length=len(block),
+ data=block)
+ flying[i].start()
+
+ for key, thread in flying.items():
+ if thread.isAlive():
+ if i < nblocks:
+ unfinished[key] = thread
+ continue
+ thread.join()
+ if thread.exception:
+ raise thread.exception
+ headers[key] = thread.value.headers
+ self._cb_next()
+ flying = unfinished
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+ finally:
+ from time import sleep
+ sleep(2 * len(activethreads()))
+ return headers.values()
def truncate_object(self, obj, upto_bytes):
"""
:param obj: (str) remote object path
:param upto_bytes: max number of bytes to leave on file
+
+ :returns: (dict) response headers
"""
- self.object_post(
+ r = self.object_post(
obj,
update=True,
content_range='bytes 0-%s/*' % upto_bytes,
content_type='application/octet-stream',
object_bytes=upto_bytes,
source_object=path4url(self.container, obj))
+ return r.headers
def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
"""Overwrite a part of an object from local source file
nblocks = 1 + (datasize - 1) // blocksize
offset = 0
if upload_cb:
- upload_gen = upload_cb(nblocks)
- upload_gen.next()
+ self.progress_bar_gen = upload_cb(nblocks)
+ self._cb_next()
+ headers = []
for i in range(nblocks):
read_size = min(blocksize, filesize - offset, datasize - offset)
block = source_file.read(read_size)
- self.object_post(
+ r = self.object_post(
obj,
update=True,
content_type='application/octet-stream',
start + offset,
start + offset + len(block) - 1),
data=block)
+ headers.append(dict(r.headers))
offset += len(block)
- if upload_cb:
- upload_gen.next()
+ self._cb_next
+ return headers
def copy_object(
self, src_container, src_object, dst_container,
:param content_type: (str)
:param delimiter: (str)
+
+ :returns: (dict) response headers
"""
self._assert_account()
self.container = dst_container
src_path = path4url(src_container, src_object)
- self.object_put(
+ r = self.object_put(
dst_object or src_object,
success=201,
copy_from=src_path,
public=public,
content_type=content_type,
delimiter=delimiter)
+ return r.headers
def move_object(
self, src_container, src_object, dst_container,
:param content_type: (str)
:param delimiter: (str)
+
+ :returns: (dict) response headers
"""
self._assert_account()
self.container = dst_container
dst_object = dst_object or src_object
src_path = path4url(src_container, src_object)
- self.object_put(
+ r = self.object_put(
dst_object,
success=201,
move_from=src_path,
public=public,
content_type=content_type,
delimiter=delimiter)
+ return r.headers
def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
"""Get accounts that share with self.account