-# Copyright 2011-2013 GRNET S.A. All rights reserved.
+# Copyright 2011-2014 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
from os import fstat
from hashlib import new as newhashlib
from time import time
+from StringIO import StringIO
from binascii import hexlify
from kamaki.clients import SilentEvent, sendlog
from kamaki.clients.pithos.rest_api import PithosRestClient
from kamaki.clients.storage import ClientError
-from kamaki.clients.utils import path4url, filter_in
-from StringIO import StringIO
+from kamaki.clients.utils import path4url, filter_in, readall
def _pithos_hash(block, blockhash):
return h.hexdigest()
-def _range_up(start, end, a_range):
- if a_range:
- (rstart, rend) = a_range.split('-')
- (rstart, rend) = (int(rstart), int(rend))
- if rstart > end or rend < start:
- return (0, 0)
- if rstart > start:
- start = rstart
- if rend < end:
- end = rend
- return (start, end)
+def _range_up(start, end, max_value, a_range):
+ """
+ :param start: (int) the window bottom
+
+ :param end: (int) the window top
+
+ :param max_value: (int) maximum accepted value
+
+ :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
+ where X: x|x-y|-x where x < y and x, y natural numbers
+
+ :returns: (str) a range string cut-off for the start-end range
+ an empty response means this window is out of range
+ """
+ assert start >= 0, '_range_up called w. start(%s) < 0' % start
+ assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
+ end, start)
+ assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
+ max_value, end)
+ if not a_range:
+ return '%s-%s' % (start, end)
+ selected = []
+ for some_range in a_range.split(','):
+ v0, sep, v1 = some_range.partition('-')
+ if v0:
+ v0 = int(v0)
+ if sep:
+ v1 = int(v1)
+ if v1 < start or v0 > end or v1 < v0:
+ continue
+ v0 = v0 if v0 > start else start
+ v1 = v1 if v1 < end else end
+ selected.append('%s-%s' % (v0, v1))
+ elif v0 < start:
+ continue
+ else:
+ v1 = v0 if v0 <= end else end
+ selected.append('%s-%s' % (start, v1))
+ else:
+ v1 = int(v1)
+ if max_value - v1 > end:
+ continue
+ v0 = (max_value - v1) if max_value - v1 > start else start
+ selected.append('%s-%s' % (v0, end))
+ return ','.join(selected)
class PithosClient(PithosRestClient):
- """GRNet Pithos API client"""
+ """Synnefo Pithos+ API client"""
def __init__(self, base_url, token, account=None, container=None):
super(PithosClient, self).__init__(base_url, token, account, container)
- def purge_container(self, container=None):
- """Delete an empty container and destroy associated blocks
+ def create_container(
+ self,
+ container=None, sizelimit=None, versioning=None, metadata=None,
+ **kwargs):
+ """
+ :param container: (str) if not given, self.container is used instead
+
+ :param sizelimit: (int) container total size limit in bytes
+
+ :param versioning: (str) can be auto or whatever supported by server
+
+ :param metadata: (dict) Custom user-defined metadata of the form
+ { 'name1': 'value1', 'name2': 'value2', ... }
+
+ :returns: (dict) response headers
"""
cnt_back_up = self.container
try:
self.container = container or cnt_back_up
- self.container_delete(until=unicode(time()))
+ r = self.container_put(
+ quota=sizelimit, versioning=versioning, metadata=metadata,
+ **kwargs)
+ return r.headers
finally:
self.container = cnt_back_up
+ def purge_container(self, container=None):
+ """Delete an empty container and destroy associated blocks"""
+ cnt_back_up = self.container
+ try:
+ self.container = container or cnt_back_up
+ r = self.container_delete(until=unicode(time()))
+ finally:
+ self.container = cnt_back_up
+ return r.headers
+
def upload_object_unchunked(
self, obj, f,
withHashFile=False,
'write':[usr and/or grp names]}
:param public: (bool)
+
+ :returns: (dict) created object metadata
"""
self._assert_container()
raise ClientError(msg, 1)
f = StringIO(data)
else:
- data = f.read(size) if size else f.read()
- self.object_put(
+ data = readall(f, size) if size else f.read()
+ r = self.object_put(
obj,
data=data,
etag=etag,
permissions=sharing,
public=public,
success=201)
+ return r.headers
def create_object_by_manifestation(
self, obj,
'write':[usr and/or grp names]}
:param public: (bool)
+
+ :returns: (dict) created object metadata
"""
self._assert_container()
- self.object_put(
+ r = self.object_put(
obj,
content_length=0,
etag=etag,
permissions=sharing,
public=public,
manifest='%s/%s' % (self.container, obj))
+ return r.headers
# upload_* auxiliary methods
- def _put_block_async(self, data, hash, upload_gen=None):
+ def _put_block_async(self, data, hash):
event = SilentEvent(method=self._put_block, data=data, hash=hash)
event.start()
return event
format='json')
assert r.json[0] == hash, 'Local hash does not match server'
- def _get_file_block_info(self, fileobj, size=None):
- meta = self.get_container_info()
+ def _get_file_block_info(self, fileobj, size=None, cache=None):
+ """
+ :param fileobj: (file descriptor) source
+
+ :param size: (int) size of data to upload from source
+
+ :param cache: (dict) if provided, cache container info response to
+ avoid redundant calls
+ """
+ if isinstance(cache, dict):
+ try:
+ meta = cache[self.container]
+ except KeyError:
+ meta = self.get_container_info()
+ cache[self.container] = meta
+ else:
+ meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
blockhash = meta['x-container-block-hash']
size = size if size is not None else fstat(fileobj.fileno()).st_size
nblocks = 1 + (size - 1) // blocksize
return (blocksize, blockhash, size, nblocks)
- def _get_missing_hashes(
+ def _create_object_or_get_missing_hashes(
self, obj, json,
size=None,
format='json',
hashmap=True,
content_type=None,
+ if_etag_match=None,
+ if_etag_not_match=None,
content_encoding=None,
content_disposition=None,
permissions=None,
hashmap=True,
content_type=content_type,
json=json,
+ if_etag_match=if_etag_match,
+ if_etag_not_match=if_etag_not_match,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=permissions,
public=public,
success=success)
- return None if r.status_code == 201 else r.json
+ return (None if r.status_code == 201 else r.json), r.headers
- def _culculate_blocks_for_upload(
+ def _calculate_blocks_for_upload(
self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
hash_cb=None):
offset = 0
hash_gen = hash_cb(nblocks)
hash_gen.next()
- for i in range(nblocks):
- block = fileobj.read(min(blocksize, size - offset))
+ for i in xrange(nblocks):
+ block = readall(fileobj, min(blocksize, size - offset))
bytes = len(block)
+ if bytes <= 0:
+ break
hash = _pithos_hash(block, blockhash)
hashes.append(hash)
hmap[hash] = (offset, bytes)
offset += bytes
if hash_cb:
hash_gen.next()
- msg = 'Failed to calculate uploaded blocks:'
- ' Offset and object size do not match'
+ msg = ('Failed to calculate uploading blocks: '
+ 'read bytes(%s) != requested size (%s)' % (offset, size))
assert offset == size, msg
def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
for hash in missing:
offset, bytes = hmap[hash]
fileobj.seek(offset)
- data = fileobj.read(bytes)
- r = self._put_block_async(data, hash, upload_gen)
+ data = readall(fileobj, bytes)
+ r = self._put_block_async(data, hash)
flying.append(r)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
hash_cb=None,
upload_cb=None,
etag=None,
+ if_etag_match=None,
if_not_exist=None,
content_encoding=None,
content_disposition=None,
content_type=None,
sharing=None,
- public=None):
+ public=None,
+ container_info_cache=None):
"""Upload an object using multiple connections (threads)
:param obj: (str) remote object path
:param etag: (str)
+ :param if_etag_match: (str) Push that value to if-match header at file
+ creation
+
:param if_not_exist: (bool) If true, the file will be uploaded ONLY if
it does not exist remotely, otherwise the operation will fail.
Involves the case of an object with the same path is created while
'write':[usr and/or grp names]}
:param public: (bool)
+
+ :param container_info_cache: (dict) if given, avoid redundant calls to
+ server for container info (block size and hash information)
"""
self._assert_container()
- #init
- block_info = (blocksize, blockhash, size, nblocks) =\
- self._get_file_block_info(f, size)
+ block_info = (
+ blocksize, blockhash, size, nblocks) = self._get_file_block_info(
+ f, size, container_info_cache)
(hashes, hmap, offset) = ([], {}, 0)
- if not content_type:
- content_type = 'application/octet-stream'
+ content_type = content_type or 'application/octet-stream'
- self._culculate_blocks_for_upload(
+ self._calculate_blocks_for_upload(
*block_info,
hashes=hashes,
hmap=hmap,
hash_cb=hash_cb)
hashmap = dict(bytes=size, hashes=hashes)
- missing = self._get_missing_hashes(
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
content_encoding=content_encoding,
content_disposition=content_disposition,
permissions=sharing,
public=public)
if missing is None:
- return
+ return obj_headers
if upload_cb:
upload_gen = upload_cb(len(missing))
sendlog.info('%s blocks missing' % len(missing))
num_of_blocks = len(missing)
missing = self._upload_missing_blocks(
- missing,
- hmap,
- f,
- upload_gen)
+ missing, hmap, f, upload_gen)
if missing:
if num_of_blocks == len(missing):
retries -= 1
else:
break
if missing:
+ try:
+ details = ['%s' % thread.exception for thread in missing]
+ except Exception:
+ details = ['Also, failed to read thread exceptions']
raise ClientError(
'%s blocks failed to upload' % len(missing),
- status=800)
+ details=details)
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
raise
- self.object_put(
+ r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
+ content_encoding=content_encoding,
+ if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
etag=etag,
json=hashmap,
+ permissions=sharing,
+ public=public,
success=201)
+ return r.headers
+
+ def upload_from_string(
+ self, obj, input_str,
+ hash_cb=None,
+ upload_cb=None,
+ etag=None,
+ if_etag_match=None,
+ if_not_exist=None,
+ content_encoding=None,
+ content_disposition=None,
+ content_type=None,
+ sharing=None,
+ public=None,
+ container_info_cache=None):
+ """Upload an object using multiple connections (threads)
+
+ :param obj: (str) remote object path
+
+ :param input_str: (str) upload content
+
+ :param hash_cb: optional progress.bar object for calculating hashes
+
+ :param upload_cb: optional progress.bar object for uploading
+
+ :param etag: (str)
+
+ :param if_etag_match: (str) Push that value to if-match header at file
+ creation
+
+ :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+ it does not exist remotely, otherwise the operation will fail.
+ Involves the case of an object with the same path is created while
+ the object is being uploaded.
+
+ :param content_encoding: (str)
+
+ :param content_disposition: (str)
+
+ :param content_type: (str)
+
+ :param sharing: {'read':[user and/or grp names],
+ 'write':[usr and/or grp names]}
+
+ :param public: (bool)
+
+ :param container_info_cache: (dict) if given, avoid redundant calls to
+ server for container info (block size and hash information)
+ """
+ self._assert_container()
+
+ blocksize, blockhash, size, nblocks = self._get_file_block_info(
+ fileobj=None, size=len(input_str), cache=container_info_cache)
+ (hashes, hmap, offset) = ([], {}, 0)
+ if not content_type:
+ content_type = 'application/octet-stream'
+
+ hashes = []
+ hmap = {}
+ for blockid in range(nblocks):
+ start = blockid * blocksize
+ block = input_str[start: (start + blocksize)]
+ hashes.append(_pithos_hash(block, blockhash))
+ hmap[hashes[blockid]] = (start, block)
+
+ hashmap = dict(bytes=size, hashes=hashes)
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
+ obj, hashmap,
+ content_type=content_type,
+ size=size,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ content_encoding=content_encoding,
+ content_disposition=content_disposition,
+ permissions=sharing,
+ public=public)
+ if missing is None:
+ return obj_headers
+ num_of_missing = len(missing)
+
+ if upload_cb:
+ self.progress_bar_gen = upload_cb(nblocks)
+ for i in range(nblocks + 1 - num_of_missing):
+ self._cb_next()
+
+ tries = 7
+ old_failures = 0
+ try:
+ while tries and missing:
+ flying = []
+ failures = []
+ for hash in missing:
+ offset, block = hmap[hash]
+ bird = self._put_block_async(block, hash)
+ flying.append(bird)
+ unfinished = self._watch_thread_limit(flying)
+ for thread in set(flying).difference(unfinished):
+ if thread.exception:
+ failures.append(thread.kwargs['hash'])
+ if thread.isAlive():
+ flying.append(thread)
+ else:
+ self._cb_next()
+ flying = unfinished
+ for thread in flying:
+ thread.join()
+ if thread.exception:
+ failures.append(thread.kwargs['hash'])
+ self._cb_next()
+ missing = failures
+ if missing and len(missing) == old_failures:
+ tries -= 1
+ old_failures = len(missing)
+ if missing:
+ raise ClientError('%s blocks failed to upload' % len(missing))
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+ raise
+ self._cb_next()
+
+ r = self.object_put(
+ obj,
+ format='json',
+ hashmap=True,
+ content_type=content_type,
+ content_encoding=content_encoding,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ etag=etag,
+ json=hashmap,
+ permissions=sharing,
+ public=public,
+ success=201)
+ return r.headers
# download_* auxiliary methods
def _get_remote_blocks_info(self, obj, **restargs):
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
def _dump_blocks_sync(
- self, obj, remote_hashes, blocksize, total_size, dst, range,
+ self, obj, remote_hashes, blocksize, total_size, dst, crange,
**args):
+ if not total_size:
+ return
for blockid, blockhash in enumerate(remote_hashes):
if blockhash:
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
- (start, end) = _range_up(start, end, range)
- args['data_range'] = 'bytes=%s-%s' % (start, end)
+ data_range = _range_up(start, end, total_size, crange)
+ if not data_range:
+ self._cb_next()
+ continue
+ args['data_range'] = 'bytes=%s' % data_range
r = self.object_get(obj, success=(200, 206), **args)
self._cb_next()
dst.write(r.content)
def _hash_from_file(self, fp, start, size, blockhash):
fp.seek(start)
- block = fp.read(size)
+ block = readall(fp, size)
h = newhashlib(blockhash)
h.update(block.strip('\x00'))
return hexlify(h.digest())
- e.g. if the range is 10-100, all blocks will be written to
normal_position - 10
"""
- for i, (key, g) in enumerate(flying.items()):
+ for key, g in flying.items():
if g.isAlive():
continue
if g.exception:
flying = dict()
blockid_dict = dict()
offset = 0
- if filerange is not None:
- rstart = int(filerange.split('-')[0])
- offset = rstart if blocksize > rstart else rstart % blocksize
self._init_thread_limit()
for block_hash, blockids in remote_hashes.items():
self._thread2file(
flying, blockid_dict, local_file, offset,
**restargs)
- end = total_size - 1 if key + blocksize > total_size\
- else key + blocksize - 1
- start, end = _range_up(key, end, filerange)
- if start == end:
+ end = total_size - 1 if (
+ key + blocksize > total_size) else key + blocksize - 1
+ if end < key:
+ self._cb_next()
+ continue
+ data_range = _range_up(key, end, total_size, filerange)
+ if not data_range:
self._cb_next()
continue
- restargs['async_headers'] = {
- 'Range': 'bytes=%s-%s' % (start, end)}
+ restargs[
+ 'async_headers'] = {'Range': 'bytes=%s' % data_range}
flying[key] = self._get_block_async(obj, **restargs)
blockid_dict[key] = unsaved
self._complete_cb()
+ def download_to_string(
+ self, obj,
+ download_cb=None,
+ version=None,
+ range_str=None,
+ if_match=None,
+ if_none_match=None,
+ if_modified_since=None,
+ if_unmodified_since=None):
+ """Download an object to a string (multiple connections). This method
+ uses threads for http requests, but stores all content in memory.
+
+ :param obj: (str) remote object path
+
+ :param download_cb: optional progress.bar object for downloading
+
+ :param version: (str) file version
+
+ :param range_str: (str) from, to are file positions (int) in bytes
+
+ :param if_match: (str)
+
+ :param if_none_match: (str)
+
+ :param if_modified_since: (str) formated date
+
+ :param if_unmodified_since: (str) formated date
+
+ :returns: (str) the whole object contents
+ """
+ restargs = dict(
+ version=version,
+ data_range=None if range_str is None else 'bytes=%s' % range_str,
+ if_match=if_match,
+ if_none_match=if_none_match,
+ if_modified_since=if_modified_since,
+ if_unmodified_since=if_unmodified_since)
+
+ (
+ blocksize,
+ blockhash,
+ total_size,
+ hash_list,
+ remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
+ assert total_size >= 0
+
+ if download_cb:
+ self.progress_bar_gen = download_cb(len(hash_list))
+ self._cb_next()
+
+ num_of_blocks = len(remote_hashes)
+ ret = [''] * num_of_blocks
+ self._init_thread_limit()
+ flying = dict()
+ try:
+ for blockid, blockhash in enumerate(remote_hashes):
+ start = blocksize * blockid
+ is_last = start + blocksize > total_size
+ end = (total_size - 1) if is_last else (start + blocksize - 1)
+ data_range_str = _range_up(start, end, end, range_str)
+ if data_range_str:
+ self._watch_thread_limit(flying.values())
+ restargs['data_range'] = 'bytes=%s' % data_range_str
+ flying[blockid] = self._get_block_async(obj, **restargs)
+ for runid, thread in flying.items():
+ if (blockid + 1) == num_of_blocks:
+ thread.join()
+ elif thread.isAlive():
+ continue
+ if thread.exception:
+ raise thread.exception
+ ret[runid] = thread.value.content
+ self._cb_next()
+ flying.pop(runid)
+ return ''.join(ret)
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+
#Command Progress Bar method
def _cb_next(self, step=1):
if hasattr(self, 'progress_bar_gen'):
if_match=None,
if_none_match=None,
if_modified_since=None,
- if_unmodified_since=None,
- data_range=None):
+ if_unmodified_since=None):
"""
:param obj: (str) remote object path
:param if_unmodified_since: (str) formated date
- :param data_range: (str) from-to where from and to are integers
- denoting file positions in bytes
-
:returns: (list)
"""
try:
if_etag_match=if_match,
if_etag_not_match=if_none_match,
if_modified_since=if_modified_since,
- if_unmodified_since=if_unmodified_since,
- data_range=data_range)
+ if_unmodified_since=if_unmodified_since)
except ClientError as err:
if err.status == 304 or err.status == 412:
return {}
:param usernames: (list)
"""
- self.account_post(update=True, groups={group: usernames})
+ r = self.account_post(update=True, groups={group: usernames})
+ return r
def del_account_group(self, group):
"""
'X-Account-Policy-Quota',
exactMatch=True)
- def get_account_versioning(self):
- """
- :returns: (dict)
- """
- return filter_in(
- self.get_account_info(),
- 'X-Account-Policy-Versioning',
- exactMatch=True)
+ #def get_account_versioning(self):
+ # """
+ # :returns: (dict)
+ # """
+ # return filter_in(
+ # self.get_account_info(),
+ # 'X-Account-Policy-Versioning',
+ # exactMatch=True)
def get_account_meta(self, until=None):
"""
- :meta until: (str) formated date
+ :param until: (str) formated date
:returns: (dict)
"""
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.account_post(update=True, metadata=metapairs)
+ r = self.account_post(update=True, metadata=metapairs)
+ return r.headers
def del_account_meta(self, metakey):
"""
:param metakey: (str) metadatum key
"""
- self.account_post(update=True, metadata={metakey: ''})
+ r = self.account_post(update=True, metadata={metakey: ''})
+ return r.headers
- def set_account_quota(self, quota):
- """
- :param quota: (int)
- """
- self.account_post(update=True, quota=quota)
+ #def set_account_quota(self, quota):
+ # """
+ # :param quota: (int)
+ # """
+ # self.account_post(update=True, quota=quota)
- def set_account_versioning(self, versioning):
- """
- "param versioning: (str)
- """
- self.account_post(update=True, versioning=versioning)
+ #def set_account_versioning(self, versioning):
+ # """
+ # :param versioning: (str)
+ # """
+ # r = self.account_post(update=True, versioning=versioning)
+ # return r.headers
def list_containers(self):
"""
raise ClientError(
'Container "%s" is not empty' % self.container,
r.status_code)
+ return r.headers
def get_container_versioning(self, container=None):
"""
finally:
self.container = cnt_back_up
- def get_container_quota(self, container=None):
+ def get_container_limit(self, container=None):
"""
:param container: (str)
finally:
self.container = cnt_back_up
- def get_container_info(self, until=None):
+ def get_container_info(self, container=None, until=None):
"""
:param until: (str) formated date
:raises ClientError: 404 Container not found
"""
+ bck_cont = self.container
try:
+ self.container = container or bck_cont
+ self._assert_container()
r = self.container_head(until=until)
except ClientError as err:
err.details.append('for container %s' % self.container)
raise err
+ finally:
+ self.container = bck_cont
return r.headers
def get_container_meta(self, until=None):
:returns: (dict)
"""
return filter_in(
- self.get_container_info(until=until),
- 'X-Container-Meta')
+ self.get_container_info(until=until), 'X-Container-Meta')
def get_container_object_meta(self, until=None):
"""
:returns: (dict)
"""
return filter_in(
- self.get_container_info(until=until),
- 'X-Container-Object-Meta')
+ self.get_container_info(until=until), 'X-Container-Object-Meta')
def set_container_meta(self, metapairs):
"""
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.container_post(update=True, metadata=metapairs)
+ r = self.container_post(update=True, metadata=metapairs)
+ return r.headers
def del_container_meta(self, metakey):
"""
:param metakey: (str) metadatum key
+
+ :returns: (dict) response headers
"""
- self.container_post(update=True, metadata={metakey: ''})
+ r = self.container_post(update=True, metadata={metakey: ''})
+ return r.headers
- def set_container_quota(self, quota):
+ def set_container_limit(self, limit):
"""
- :param quota: (int)
+ :param limit: (int)
"""
- self.container_post(update=True, quota=quota)
+ r = self.container_post(update=True, quota=limit)
+ return r.headers
def set_container_versioning(self, versioning):
"""
:param versioning: (str)
"""
- self.container_post(update=True, versioning=versioning)
+ r = self.container_post(update=True, versioning=versioning)
+ return r.headers
def del_object(self, obj, until=None, delimiter=None):
"""
:param delimiter: (str)
"""
self._assert_container()
- self.object_delete(obj, until=until, delimiter=delimiter)
+ r = self.object_delete(obj, until=until, delimiter=delimiter)
+ return r.headers
def set_object_meta(self, obj, metapairs):
"""
:param metapairs: (dict) {key1:val1, key2:val2, ...}
"""
assert(type(metapairs) is dict)
- self.object_post(obj, update=True, metadata=metapairs)
+ r = self.object_post(obj, update=True, metadata=metapairs)
+ return r.headers
def del_object_meta(self, obj, metakey):
"""
:param metakey: (str) metadatum key
"""
- self.object_post(obj, update=True, metadata={metakey: ''})
+ r = self.object_post(obj, update=True, metadata={metakey: ''})
+ return r.headers
def publish_object(self, obj):
"""
"""
self.object_post(obj, update=True, public=True)
info = self.get_object_info(obj)
+ return info['x-object-public']
pref, sep, rest = self.base_url.partition('//')
base = rest.split('/')[0]
return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
"""
:param obj: (str) remote object path
"""
- self.object_post(obj, update=True, public=False)
+ r = self.object_post(obj, update=True, public=False)
+ return r.headers
def get_object_info(self, obj, version=None):
"""
def set_object_sharing(
self, obj,
- read_permition=False, write_permition=False):
+ read_permission=False, write_permission=False):
"""Give read/write permisions to an object.
:param obj: (str) remote object path
- :param read_permition: (list - bool) users and user groups that get
- read permition for this object - False means all previous read
+ :param read_permission: (list - bool) users and user groups that get
+ read permission for this object - False means all previous read
permissions will be removed
- :param write_perimition: (list - bool) of users and user groups to get
- write permition for this object - False means all previous write
+ :param write_permission: (list - bool) of users and user groups to get
+ write permission for this object - False means all previous write
permissions will be removed
+
+ :returns: (dict) response headers
"""
- perms = dict(read=read_permition or '', write=write_permition or '')
- self.object_post(obj, update=True, permissions=perms)
+ perms = dict(read=read_permission or '', write=write_permission or '')
+ r = self.object_post(obj, update=True, permissions=perms)
+ return r.headers
def del_object_sharing(self, obj):
"""
:param obj: (str) remote object path
"""
- self.set_object_sharing(obj)
+ return self.set_object_sharing(obj)
def append_object(self, obj, source_file, upload_cb=None):
"""
:param upload_db: progress.bar for uploading
"""
-
self._assert_container()
meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
filesize = fstat(source_file.fileno()).st_size
nblocks = 1 + (filesize - 1) // blocksize
offset = 0
+ headers = {}
if upload_cb:
- upload_gen = upload_cb(nblocks)
- upload_gen.next()
- for i in range(nblocks):
- block = source_file.read(min(blocksize, filesize - offset))
- offset += len(block)
- self.object_post(
- obj,
- update=True,
- content_range='bytes */*',
- content_type='application/octet-stream',
- content_length=len(block),
- data=block)
+ self.progress_bar_gen = upload_cb(nblocks)
+ self._cb_next()
+ flying = {}
+ self._init_thread_limit()
+ try:
+ for i in range(nblocks):
+ block = source_file.read(min(blocksize, filesize - offset))
+ offset += len(block)
- if upload_cb:
- upload_gen.next()
+ self._watch_thread_limit(flying.values())
+ unfinished = {}
+ flying[i] = SilentEvent(
+ method=self.object_post,
+ obj=obj,
+ update=True,
+ content_range='bytes */*',
+ content_type='application/octet-stream',
+ content_length=len(block),
+ data=block)
+ flying[i].start()
+
+ for key, thread in flying.items():
+ if thread.isAlive():
+ if i < nblocks:
+ unfinished[key] = thread
+ continue
+ thread.join()
+ if thread.exception:
+ raise thread.exception
+ headers[key] = thread.value.headers
+ self._cb_next()
+ flying = unfinished
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+ finally:
+ from time import sleep
+ sleep(2 * len(activethreads()))
+ self._cb_next()
+ return headers.values()
def truncate_object(self, obj, upto_bytes):
"""
:param obj: (str) remote object path
:param upto_bytes: max number of bytes to leave on file
+
+ :returns: (dict) response headers
"""
- self.object_post(
+ ctype = self.get_object_info(obj)['content-type']
+ r = self.object_post(
obj,
update=True,
content_range='bytes 0-%s/*' % upto_bytes,
- content_type='application/octet-stream',
+ content_type=ctype,
object_bytes=upto_bytes,
source_object=path4url(self.container, obj))
+ return r.headers
- def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
+ def overwrite_object(
+ self, obj, start, end, source_file,
+ source_version=None, upload_cb=None):
"""Overwrite a part of an object from local source file
+ ATTENTION: content_type must always be application/octet-stream
:param obj: (str) remote object path
:param upload_db: progress.bar for uploading
"""
- r = self.get_object_info(obj)
- rf_size = int(r['content-length'])
- if rf_size < int(start):
- raise ClientError(
- 'Range start exceeds file size',
- status=416)
- elif rf_size < int(end):
- raise ClientError(
- 'Range end exceeds file size',
- status=416)
self._assert_container()
+ r = self.get_object_info(obj, version=source_version)
+ rf_size = int(r['content-length'])
+ start, end = int(start), int(end)
+ assert rf_size >= start, 'Range start %s exceeds file size %s' % (
+ start, rf_size)
meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
filesize = fstat(source_file.fileno()).st_size
- datasize = int(end) - int(start) + 1
+ datasize = end - start + 1
nblocks = 1 + (datasize - 1) // blocksize
offset = 0
if upload_cb:
- upload_gen = upload_cb(nblocks)
- upload_gen.next()
+ self.progress_bar_gen = upload_cb(nblocks)
+ self._cb_next()
+ headers = []
for i in range(nblocks):
read_size = min(blocksize, filesize - offset, datasize - offset)
block = source_file.read(read_size)
- self.object_post(
+ r = self.object_post(
obj,
update=True,
content_type='application/octet-stream',
content_range='bytes %s-%s/*' % (
start + offset,
start + offset + len(block) - 1),
+ source_version=source_version,
data=block)
+ headers.append(dict(r.headers))
offset += len(block)
-
- if upload_cb:
- upload_gen.next()
+ self._cb_next()
+ self._cb_next()
+ return headers
def copy_object(
self, src_container, src_object, dst_container,
:param content_type: (str)
:param delimiter: (str)
+
+ :returns: (dict) response headers
"""
self._assert_account()
self.container = dst_container
src_path = path4url(src_container, src_object)
- self.object_put(
+ r = self.object_put(
dst_object or src_object,
success=201,
copy_from=src_path,
public=public,
content_type=content_type,
delimiter=delimiter)
+ return r.headers
def move_object(
self, src_container, src_object, dst_container,
:param content_type: (str)
:param delimiter: (str)
+
+ :returns: (dict) response headers
"""
self._assert_account()
self.container = dst_container
dst_object = dst_object or src_object
src_path = path4url(src_container, src_object)
- self.object_put(
+ r = self.object_put(
dst_object,
success=201,
move_from=src_path,
public=public,
content_type=content_type,
delimiter=delimiter)
+ return r.headers
def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
"""Get accounts that share with self.account