-# Copyright 2011-2013 GRNET S.A. All rights reserved.
+# Copyright 2011-2014 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
from kamaki.clients import SilentEvent, sendlog
from kamaki.clients.pithos.rest_api import PithosRestClient
from kamaki.clients.storage import ClientError
-from kamaki.clients.utils import path4url, filter_in
+from kamaki.clients.utils import path4url, filter_in, readall
def _pithos_hash(block, blockhash):
return h.hexdigest()
-def _range_up(start, end, a_range):
- if a_range:
- (rstart, rend) = a_range.split('-')
- (rstart, rend) = (int(rstart), int(rend))
- if rstart > end or rend < start:
- return (0, 0)
- if rstart > start:
- start = rstart
- if rend < end:
- end = rend
- return (start, end)
+def _range_up(start, end, max_value, a_range):
+ """
+ :param start: (int) the window bottom
+
+ :param end: (int) the window top
+
+ :param max_value: (int) maximum accepted value
+
+ :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
+ where X: x|x-y|-x where x < y and x, y natural numbers
+
+ :returns: (str) a range string cut-off for the start-end range
+ an empty response means this window is out of range
+ """
+ assert start >= 0, '_range_up called w. start(%s) < 0' % start
+ assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
+ end, start)
+ assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
+ max_value, end)
+ if not a_range:
+ return '%s-%s' % (start, end)
+ selected = []
+ for some_range in a_range.split(','):
+ v0, sep, v1 = some_range.partition('-')
+ if v0:
+ v0 = int(v0)
+ if sep:
+ v1 = int(v1)
+ if v1 < start or v0 > end or v1 < v0:
+ continue
+ v0 = v0 if v0 > start else start
+ v1 = v1 if v1 < end else end
+ selected.append('%s-%s' % (v0, v1))
+ elif v0 < start:
+ continue
+ else:
+ v1 = v0 if v0 <= end else end
+ selected.append('%s-%s' % (start, v1))
+ else:
+ v1 = int(v1)
+ if max_value - v1 > end:
+ continue
+ v0 = (max_value - v1) if max_value - v1 > start else start
+ selected.append('%s-%s' % (v0, end))
+ return ','.join(selected)
class PithosClient(PithosRestClient):
def create_container(
self,
- container=None, sizelimit=None, versioning=None, metadata=None):
+ container=None, sizelimit=None, versioning=None, metadata=None,
+ **kwargs):
"""
:param container: (str) if not given, self.container is used instead
try:
self.container = container or cnt_back_up
r = self.container_put(
- quota=sizelimit, versioning=versioning, metadata=metadata)
+ quota=sizelimit, versioning=versioning, metadata=metadata,
+ **kwargs)
return r.headers
finally:
self.container = cnt_back_up
def purge_container(self, container=None):
- """Delete an empty container and destroy associated blocks
- """
+ """Delete an empty container and destroy associated blocks"""
cnt_back_up = self.container
try:
self.container = container or cnt_back_up
raise ClientError(msg, 1)
f = StringIO(data)
else:
- data = f.read(size) if size else f.read()
+ data = readall(f, size) if size else f.read()
r = self.object_put(
obj,
data=data,
hash_gen = hash_cb(nblocks)
hash_gen.next()
- for i in range(nblocks):
- block = fileobj.read(min(blocksize, size - offset))
+ for i in xrange(nblocks):
+ block = readall(fileobj, min(blocksize, size - offset))
bytes = len(block)
+ if bytes <= 0:
+ break
hash = _pithos_hash(block, blockhash)
hashes.append(hash)
hmap[hash] = (offset, bytes)
offset += bytes
if hash_cb:
hash_gen.next()
- msg = 'Failed to calculate uploaded blocks:'
- ' Offset and object size do not match'
+ msg = ('Failed to calculate uploading blocks: '
+ 'read bytes(%s) != requested size (%s)' % (offset, size))
assert offset == size, msg
def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
for hash in missing:
offset, bytes = hmap[hash]
fileobj.seek(offset)
- data = fileobj.read(bytes)
+ data = readall(fileobj, bytes)
r = self._put_block_async(data, hash)
flying.append(r)
unfinished = self._watch_thread_limit(flying)
blocksize, blockhash, size, nblocks) = self._get_file_block_info(
f, size, container_info_cache)
(hashes, hmap, offset) = ([], {}, 0)
- if not content_type:
- content_type = 'application/octet-stream'
+ content_type = content_type or 'application/octet-stream'
self._calculate_blocks_for_upload(
*block_info,
sendlog.info('%s blocks missing' % len(missing))
num_of_blocks = len(missing)
missing = self._upload_missing_blocks(
- missing,
- hmap,
- f,
- upload_gen)
+ missing, hmap, f, upload_gen)
if missing:
if num_of_blocks == len(missing):
retries -= 1
else:
break
if missing:
+ try:
+ details = ['%s' % thread.exception for thread in missing]
+ except Exception:
+ details = ['Also, failed to read thread exceptions']
raise ClientError(
'%s blocks failed to upload' % len(missing),
- details=['%s' % thread.exception for thread in missing])
+ details=details)
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
tries -= 1
old_failures = len(missing)
if missing:
- raise ClientError(
- '%s blocks failed to upload' % len(missing),
- details=['%s' % thread.exception for thread in missing])
+ raise ClientError('%s blocks failed to upload' % len(missing))
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():
thread.join()
raise
+ self._cb_next()
r = self.object_put(
obj,
format='json',
hashmap=True,
content_type=content_type,
+ content_encoding=content_encoding,
if_etag_match=if_etag_match,
if_etag_not_match='*' if if_not_exist else None,
etag=etag,
return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
def _dump_blocks_sync(
- self, obj, remote_hashes, blocksize, total_size, dst, range,
+ self, obj, remote_hashes, blocksize, total_size, dst, crange,
**args):
+ if not total_size:
+ return
for blockid, blockhash in enumerate(remote_hashes):
if blockhash:
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
- (start, end) = _range_up(start, end, range)
- args['data_range'] = 'bytes=%s-%s' % (start, end)
+ data_range = _range_up(start, end, total_size, crange)
+ if not data_range:
+ self._cb_next()
+ continue
+ args['data_range'] = 'bytes=%s' % data_range
r = self.object_get(obj, success=(200, 206), **args)
self._cb_next()
dst.write(r.content)
def _hash_from_file(self, fp, start, size, blockhash):
fp.seek(start)
- block = fp.read(size)
+ block = readall(fp, size)
h = newhashlib(blockhash)
h.update(block.strip('\x00'))
return hexlify(h.digest())
flying = dict()
blockid_dict = dict()
offset = 0
- if filerange is not None:
- rstart = int(filerange.split('-')[0])
- offset = rstart if blocksize > rstart else rstart % blocksize
self._init_thread_limit()
for block_hash, blockids in remote_hashes.items():
**restargs)
end = total_size - 1 if (
key + blocksize > total_size) else key + blocksize - 1
- start, end = _range_up(key, end, filerange)
- if start == end:
+ if end < key:
+ self._cb_next()
+ continue
+ data_range = _range_up(key, end, total_size, filerange)
+ if not data_range:
self._cb_next()
continue
- restargs['async_headers'] = {
- 'Range': 'bytes=%s-%s' % (start, end)}
+ restargs[
+ 'async_headers'] = {'Range': 'bytes=%s' % data_range}
flying[key] = self._get_block_async(obj, **restargs)
blockid_dict[key] = unsaved
start = blocksize * blockid
is_last = start + blocksize > total_size
end = (total_size - 1) if is_last else (start + blocksize - 1)
- (start, end) = _range_up(start, end, range_str)
- if start < end:
+ data_range_str = _range_up(start, end, end, range_str)
+ if data_range_str:
self._watch_thread_limit(flying.values())
+ restargs['data_range'] = 'bytes=%s' % data_range_str
flying[blockid] = self._get_block_async(obj, **restargs)
for runid, thread in flying.items():
if (blockid + 1) == num_of_blocks:
if_match=None,
if_none_match=None,
if_modified_since=None,
- if_unmodified_since=None,
- data_range=None):
+ if_unmodified_since=None):
"""
:param obj: (str) remote object path
:param if_unmodified_since: (str) formated date
- :param data_range: (str) from-to where from and to are integers
- denoting file positions in bytes
-
:returns: (list)
"""
try:
if_etag_match=if_match,
if_etag_not_match=if_none_match,
if_modified_since=if_modified_since,
- if_unmodified_since=if_unmodified_since,
- data_range=data_range)
+ if_unmodified_since=if_unmodified_since)
except ClientError as err:
if err.status == 304 or err.status == 412:
return {}
finally:
self.container = cnt_back_up
- def get_container_info(self, until=None):
+ def get_container_info(self, container=None, until=None):
"""
:param until: (str) formated date
:raises ClientError: 404 Container not found
"""
+ bck_cont = self.container
try:
+ self.container = container or bck_cont
+ self._assert_container()
r = self.container_head(until=until)
except ClientError as err:
err.details.append('for container %s' % self.container)
raise err
+ finally:
+ self.container = bck_cont
return r.headers
def get_container_meta(self, until=None):
:returns: (dict)
"""
return filter_in(
- self.get_container_info(until=until),
- 'X-Container-Meta')
+ self.get_container_info(until=until), 'X-Container-Meta')
def get_container_object_meta(self, until=None):
"""
:returns: (dict)
"""
return filter_in(
- self.get_container_info(until=until),
- 'X-Container-Object-Meta')
+ self.get_container_info(until=until), 'X-Container-Object-Meta')
def set_container_meta(self, metapairs):
"""
finally:
from time import sleep
sleep(2 * len(activethreads()))
+ self._cb_next()
return headers.values()
def truncate_object(self, obj, upto_bytes):
:returns: (dict) response headers
"""
+ ctype = self.get_object_info(obj)['content-type']
r = self.object_post(
obj,
update=True,
content_range='bytes 0-%s/*' % upto_bytes,
- content_type='application/octet-stream',
+ content_type=ctype,
object_bytes=upto_bytes,
source_object=path4url(self.container, obj))
return r.headers
- def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
+ def overwrite_object(
+ self, obj, start, end, source_file,
+ source_version=None, upload_cb=None):
"""Overwrite a part of an object from local source file
+ ATTENTION: content_type must always be application/octet-stream
:param obj: (str) remote object path
:param upload_db: progress.bar for uploading
"""
- r = self.get_object_info(obj)
- rf_size = int(r['content-length'])
- if rf_size < int(start):
- raise ClientError(
- 'Range start exceeds file size',
- status=416)
- elif rf_size < int(end):
- raise ClientError(
- 'Range end exceeds file size',
- status=416)
self._assert_container()
+ r = self.get_object_info(obj, version=source_version)
+ rf_size = int(r['content-length'])
+ start, end = int(start), int(end)
+ assert rf_size >= start, 'Range start %s exceeds file size %s' % (
+ start, rf_size)
meta = self.get_container_info()
blocksize = int(meta['x-container-block-size'])
filesize = fstat(source_file.fileno()).st_size
- datasize = int(end) - int(start) + 1
+ datasize = end - start + 1
nblocks = 1 + (datasize - 1) // blocksize
offset = 0
if upload_cb:
content_range='bytes %s-%s/*' % (
start + offset,
start + offset + len(block) - 1),
+ source_version=source_version,
data=block)
headers.append(dict(r.headers))
offset += len(block)
-
- self._cb_next
+ self._cb_next()
+ self._cb_next()
return headers
def copy_object(