Update copyright dates for changes files
[kamaki] / kamaki / clients / pithos / __init__.py
index d010623..4eb7153 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2011-2013 GRNET S.A. All rights reserved.
+# Copyright 2011-2014 GRNET S.A. All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or
 # without modification, are permitted provided that the following
@@ -43,7 +43,7 @@ from binascii import hexlify
 from kamaki.clients import SilentEvent, sendlog
 from kamaki.clients.pithos.rest_api import PithosRestClient
 from kamaki.clients.storage import ClientError
-from kamaki.clients.utils import path4url, filter_in
+from kamaki.clients.utils import path4url, filter_in, readall
 
 
 def _pithos_hash(block, blockhash):
@@ -52,17 +52,51 @@ def _pithos_hash(block, blockhash):
     return h.hexdigest()
 
 
-def _range_up(start, end, a_range):
-    if a_range:
-        (rstart, rend) = a_range.split('-')
-        (rstart, rend) = (int(rstart), int(rend))
-        if rstart > end or rend < start:
-            return (0, 0)
-        if rstart > start:
-            start = rstart
-        if rend < end:
-            end = rend
-    return (start, end)
+def _range_up(start, end, max_value, a_range):
+    """
+    :param start: (int) the window bottom
+
+    :param end: (int) the window top
+
+    :param max_value: (int) maximum accepted value
+
+    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
+        where X: x|x-y|-x where x < y and x, y natural numbers
+
+    :returns: (str) a range string cut-off for the start-end range
+        an empty response means this window is out of range
+    """
+    assert start >= 0, '_range_up called w. start(%s) < 0' % start
+    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
+        end, start)
+    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
+        max_value, end)
+    if not a_range:
+        return '%s-%s' % (start, end)
+    selected = []
+    for some_range in a_range.split(','):
+        v0, sep, v1 = some_range.partition('-')
+        if v0:
+            v0 = int(v0)
+            if sep:
+                v1 = int(v1)
+                if v1 < start or v0 > end or v1 < v0:
+                    continue
+                v0 = v0 if v0 > start else start
+                v1 = v1 if v1 < end else end
+                selected.append('%s-%s' % (v0, v1))
+            elif v0 < start:
+                continue
+            else:
+                v1 = v0 if v0 <= end else end
+                selected.append('%s-%s' % (start, v1))
+        else:
+            v1 = int(v1)
+            if max_value - v1 > end:
+                continue
+            v0 = (max_value - v1) if max_value - v1 > start else start
+            selected.append('%s-%s' % (v0, end))
+    return ','.join(selected)
 
 
 class PithosClient(PithosRestClient):
@@ -73,7 +107,8 @@ class PithosClient(PithosRestClient):
 
     def create_container(
             self,
-            container=None, sizelimit=None, versioning=None, metadata=None):
+            container=None, sizelimit=None, versioning=None, metadata=None,
+            **kwargs):
         """
         :param container: (str) if not given, self.container is used instead
 
@@ -90,14 +125,14 @@ class PithosClient(PithosRestClient):
         try:
             self.container = container or cnt_back_up
             r = self.container_put(
-                quota=sizelimit, versioning=versioning, metadata=metadata)
+                quota=sizelimit, versioning=versioning, metadata=metadata,
+                **kwargs)
             return r.headers
         finally:
             self.container = cnt_back_up
 
     def purge_container(self, container=None):
-        """Delete an empty container and destroy associated blocks
-        """
+        """Delete an empty container and destroy associated blocks"""
         cnt_back_up = self.container
         try:
             self.container = container or cnt_back_up
@@ -154,7 +189,7 @@ class PithosClient(PithosRestClient):
                 raise ClientError(msg, 1)
             f = StringIO(data)
         else:
-            data = f.read(size) if size else f.read()
+            data = readall(f, size) if size else f.read()
         r = self.object_put(
             obj,
             data=data,
@@ -280,17 +315,19 @@ class PithosClient(PithosRestClient):
             hash_gen = hash_cb(nblocks)
             hash_gen.next()
 
-        for i in range(nblocks):
-            block = fileobj.read(min(blocksize, size - offset))
+        for i in xrange(nblocks):
+            block = readall(fileobj, min(blocksize, size - offset))
             bytes = len(block)
+            if bytes <= 0:
+                break
             hash = _pithos_hash(block, blockhash)
             hashes.append(hash)
             hmap[hash] = (offset, bytes)
             offset += bytes
             if hash_cb:
                 hash_gen.next()
-        msg = 'Failed to calculate uploaded blocks:'
-        ' Offset and object size do not match'
+        msg = ('Failed to calculate uploading blocks: '
+               'read bytes(%s) != requested size (%s)' % (offset, size))
         assert offset == size, msg
 
     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
@@ -303,7 +340,7 @@ class PithosClient(PithosRestClient):
         for hash in missing:
             offset, bytes = hmap[hash]
             fileobj.seek(offset)
-            data = fileobj.read(bytes)
+            data = readall(fileobj, bytes)
             r = self._put_block_async(data, hash)
             flying.append(r)
             unfinished = self._watch_thread_limit(flying)
@@ -389,8 +426,7 @@ class PithosClient(PithosRestClient):
             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
                 f, size, container_info_cache)
         (hashes, hmap, offset) = ([], {}, 0)
-        if not content_type:
-            content_type = 'application/octet-stream'
+        content_type = content_type or 'application/octet-stream'
 
         self._calculate_blocks_for_upload(
             *block_info,
@@ -430,10 +466,7 @@ class PithosClient(PithosRestClient):
                 sendlog.info('%s blocks missing' % len(missing))
                 num_of_blocks = len(missing)
                 missing = self._upload_missing_blocks(
-                    missing,
-                    hmap,
-                    f,
-                    upload_gen)
+                    missing, hmap, f, upload_gen)
                 if missing:
                     if num_of_blocks == len(missing):
                         retries -= 1
@@ -442,9 +475,13 @@ class PithosClient(PithosRestClient):
                 else:
                     break
             if missing:
+                try:
+                    details = ['%s' % thread.exception for thread in missing]
+                except Exception:
+                    details = ['Also, failed to read thread exceptions']
                 raise ClientError(
                     '%s blocks failed to upload' % len(missing),
-                    details=['%s' % thread.exception for thread in missing])
+                    details=details)
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():
@@ -578,20 +615,20 @@ class PithosClient(PithosRestClient):
                     tries -= 1
                 old_failures = len(missing)
             if missing:
-                raise ClientError(
-                    '%s blocks failed to upload' % len(missing),
-                    details=['%s' % thread.exception for thread in missing])
+                raise ClientError('%s blocks failed to upload' % len(missing))
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():
                 thread.join()
             raise
+        self._cb_next()
 
         r = self.object_put(
             obj,
             format='json',
             hashmap=True,
             content_type=content_type,
+            content_encoding=content_encoding,
             if_etag_match=if_etag_match,
             if_etag_not_match='*' if if_not_exist else None,
             etag=etag,
@@ -621,15 +658,20 @@ class PithosClient(PithosRestClient):
         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
 
     def _dump_blocks_sync(
-            self, obj, remote_hashes, blocksize, total_size, dst, range,
+            self, obj, remote_hashes, blocksize, total_size, dst, crange,
             **args):
+        if not total_size:
+            return
         for blockid, blockhash in enumerate(remote_hashes):
             if blockhash:
                 start = blocksize * blockid
                 is_last = start + blocksize > total_size
                 end = (total_size - 1) if is_last else (start + blocksize - 1)
-                (start, end) = _range_up(start, end, range)
-                args['data_range'] = 'bytes=%s-%s' % (start, end)
+                data_range = _range_up(start, end, total_size, crange)
+                if not data_range:
+                    self._cb_next()
+                    continue
+                args['data_range'] = 'bytes=%s' % data_range
                 r = self.object_get(obj, success=(200, 206), **args)
                 self._cb_next()
                 dst.write(r.content)
@@ -642,7 +684,7 @@ class PithosClient(PithosRestClient):
 
     def _hash_from_file(self, fp, start, size, blockhash):
         fp.seek(start)
-        block = fp.read(size)
+        block = readall(fp, size)
         h = newhashlib(blockhash)
         h.update(block.strip('\x00'))
         return hexlify(h.digest())
@@ -675,9 +717,6 @@ class PithosClient(PithosRestClient):
         flying = dict()
         blockid_dict = dict()
         offset = 0
-        if filerange is not None:
-            rstart = int(filerange.split('-')[0])
-            offset = rstart if blocksize > rstart else rstart % blocksize
 
         self._init_thread_limit()
         for block_hash, blockids in remote_hashes.items():
@@ -694,12 +733,15 @@ class PithosClient(PithosRestClient):
                     **restargs)
                 end = total_size - 1 if (
                     key + blocksize > total_size) else key + blocksize - 1
-                start, end = _range_up(key, end, filerange)
-                if start == end:
+                if end < key:
+                    self._cb_next()
+                    continue
+                data_range = _range_up(key, end, total_size, filerange)
+                if not data_range:
                     self._cb_next()
                     continue
-                restargs['async_headers'] = {
-                    'Range': 'bytes=%s-%s' % (start, end)}
+                restargs[
+                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
                 flying[key] = self._get_block_async(obj, **restargs)
                 blockid_dict[key] = unsaved
 
@@ -842,9 +884,10 @@ class PithosClient(PithosRestClient):
                 start = blocksize * blockid
                 is_last = start + blocksize > total_size
                 end = (total_size - 1) if is_last else (start + blocksize - 1)
-                (start, end) = _range_up(start, end, range_str)
-                if start < end:
+                data_range_str = _range_up(start, end, end, range_str)
+                if data_range_str:
                     self._watch_thread_limit(flying.values())
+                    restargs['data_range'] = 'bytes=%s' % data_range_str
                     flying[blockid] = self._get_block_async(obj, **restargs)
                 for runid, thread in flying.items():
                     if (blockid + 1) == num_of_blocks:
@@ -884,8 +927,7 @@ class PithosClient(PithosRestClient):
             if_match=None,
             if_none_match=None,
             if_modified_since=None,
-            if_unmodified_since=None,
-            data_range=None):
+            if_unmodified_since=None):
         """
         :param obj: (str) remote object path
 
@@ -897,9 +939,6 @@ class PithosClient(PithosRestClient):
 
         :param if_unmodified_since: (str) formated date
 
-        :param data_range: (str) from-to where from and to are integers
-            denoting file positions in bytes
-
         :returns: (list)
         """
         try:
@@ -910,8 +949,7 @@ class PithosClient(PithosRestClient):
                 if_etag_match=if_match,
                 if_etag_not_match=if_none_match,
                 if_modified_since=if_modified_since,
-                if_unmodified_since=if_unmodified_since,
-                data_range=data_range)
+                if_unmodified_since=if_unmodified_since)
         except ClientError as err:
             if err.status == 304 or err.status == 412:
                 return {}
@@ -1066,7 +1104,7 @@ class PithosClient(PithosRestClient):
         finally:
             self.container = cnt_back_up
 
-    def get_container_info(self, until=None):
+    def get_container_info(self, container=None, until=None):
         """
         :param until: (str) formated date
 
@@ -1074,11 +1112,16 @@ class PithosClient(PithosRestClient):
 
         :raises ClientError: 404 Container not found
         """
+        bck_cont = self.container
         try:
+            self.container = container or bck_cont
+            self._assert_container()
             r = self.container_head(until=until)
         except ClientError as err:
             err.details.append('for container %s' % self.container)
             raise err
+        finally:
+            self.container = bck_cont
         return r.headers
 
     def get_container_meta(self, until=None):
@@ -1088,8 +1131,7 @@ class PithosClient(PithosRestClient):
         :returns: (dict)
         """
         return filter_in(
-            self.get_container_info(until=until),
-            'X-Container-Meta')
+            self.get_container_info(until=until), 'X-Container-Meta')
 
     def get_container_object_meta(self, until=None):
         """
@@ -1098,8 +1140,7 @@ class PithosClient(PithosRestClient):
         :returns: (dict)
         """
         return filter_in(
-            self.get_container_info(until=until),
-            'X-Container-Object-Meta')
+            self.get_container_info(until=until), 'X-Container-Object-Meta')
 
     def set_container_meta(self, metapairs):
         """
@@ -1316,6 +1357,7 @@ class PithosClient(PithosRestClient):
         finally:
             from time import sleep
             sleep(2 * len(activethreads()))
+            self._cb_next()
         return headers.values()
 
     def truncate_object(self, obj, upto_bytes):
@@ -1326,17 +1368,21 @@ class PithosClient(PithosRestClient):
 
         :returns: (dict) response headers
         """
+        ctype = self.get_object_info(obj)['content-type']
         r = self.object_post(
             obj,
             update=True,
             content_range='bytes 0-%s/*' % upto_bytes,
-            content_type='application/octet-stream',
+            content_type=ctype,
             object_bytes=upto_bytes,
             source_object=path4url(self.container, obj))
         return r.headers
 
-    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
+    def overwrite_object(
+            self, obj, start, end, source_file,
+            source_version=None, upload_cb=None):
         """Overwrite a part of an object from local source file
+        ATTENTION: content_type must always be application/octet-stream
 
         :param obj: (str) remote object path
 
@@ -1349,21 +1395,16 @@ class PithosClient(PithosRestClient):
         :param upload_db: progress.bar for uploading
         """
 
-        r = self.get_object_info(obj)
-        rf_size = int(r['content-length'])
-        if rf_size < int(start):
-            raise ClientError(
-                'Range start exceeds file size',
-                status=416)
-        elif rf_size < int(end):
-            raise ClientError(
-                'Range end exceeds file size',
-                status=416)
         self._assert_container()
+        r = self.get_object_info(obj, version=source_version)
+        rf_size = int(r['content-length'])
+        start, end = int(start), int(end)
+        assert rf_size >= start, 'Range start %s exceeds file size %s' % (
+            start, rf_size)
         meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         filesize = fstat(source_file.fileno()).st_size
-        datasize = int(end) - int(start) + 1
+        datasize = end - start + 1
         nblocks = 1 + (datasize - 1) // blocksize
         offset = 0
         if upload_cb:
@@ -1381,11 +1422,12 @@ class PithosClient(PithosRestClient):
                 content_range='bytes %s-%s/*' % (
                     start + offset,
                     start + offset + len(block) - 1),
+                source_version=source_version,
                 data=block)
             headers.append(dict(r.headers))
             offset += len(block)
-
-            self._cb_next
+            self._cb_next()
+        self._cb_next()
         return headers
 
     def copy_object(