Update copyright dates for changes files
[kamaki] / kamaki / clients / pithos / __init__.py
index 3dbcd66..4eb7153 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright 2011-2013 GRNET S.A. All rights reserved.
+# Copyright 2011-2014 GRNET S.A. All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or
 # without modification, are permitted provided that the following
@@ -43,7 +43,7 @@ from binascii import hexlify
 from kamaki.clients import SilentEvent, sendlog
 from kamaki.clients.pithos.rest_api import PithosRestClient
 from kamaki.clients.storage import ClientError
-from kamaki.clients.utils import path4url, filter_in
+from kamaki.clients.utils import path4url, filter_in, readall
 
 
 def _pithos_hash(block, blockhash):
@@ -132,8 +132,7 @@ class PithosClient(PithosRestClient):
             self.container = cnt_back_up
 
     def purge_container(self, container=None):
-        """Delete an empty container and destroy associated blocks
-        """
+        """Delete an empty container and destroy associated blocks"""
         cnt_back_up = self.container
         try:
             self.container = container or cnt_back_up
@@ -190,7 +189,7 @@ class PithosClient(PithosRestClient):
                 raise ClientError(msg, 1)
             f = StringIO(data)
         else:
-            data = f.read(size) if size else f.read()
+            data = readall(f, size) if size else f.read()
         r = self.object_put(
             obj,
             data=data,
@@ -316,17 +315,19 @@ class PithosClient(PithosRestClient):
             hash_gen = hash_cb(nblocks)
             hash_gen.next()
 
-        for i in range(nblocks):
-            block = fileobj.read(min(blocksize, size - offset))
+        for i in xrange(nblocks):
+            block = readall(fileobj, min(blocksize, size - offset))
             bytes = len(block)
+            if bytes <= 0:
+                break
             hash = _pithos_hash(block, blockhash)
             hashes.append(hash)
             hmap[hash] = (offset, bytes)
             offset += bytes
             if hash_cb:
                 hash_gen.next()
-        msg = 'Failed to calculate uploaded blocks:'
-        ' Offset and object size do not match'
+        msg = ('Failed to calculate uploading blocks: '
+               'read bytes(%s) != requested size (%s)' % (offset, size))
         assert offset == size, msg
 
     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
@@ -339,7 +340,7 @@ class PithosClient(PithosRestClient):
         for hash in missing:
             offset, bytes = hmap[hash]
             fileobj.seek(offset)
-            data = fileobj.read(bytes)
+            data = readall(fileobj, bytes)
             r = self._put_block_async(data, hash)
             flying.append(r)
             unfinished = self._watch_thread_limit(flying)
@@ -425,8 +426,7 @@ class PithosClient(PithosRestClient):
             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
                 f, size, container_info_cache)
         (hashes, hmap, offset) = ([], {}, 0)
-        if not content_type:
-            content_type = 'application/octet-stream'
+        content_type = content_type or 'application/octet-stream'
 
         self._calculate_blocks_for_upload(
             *block_info,
@@ -466,10 +466,7 @@ class PithosClient(PithosRestClient):
                 sendlog.info('%s blocks missing' % len(missing))
                 num_of_blocks = len(missing)
                 missing = self._upload_missing_blocks(
-                    missing,
-                    hmap,
-                    f,
-                    upload_gen)
+                    missing, hmap, f, upload_gen)
                 if missing:
                     if num_of_blocks == len(missing):
                         retries -= 1
@@ -618,14 +615,13 @@ class PithosClient(PithosRestClient):
                     tries -= 1
                 old_failures = len(missing)
             if missing:
-                raise ClientError(
-                    '%s blocks failed to upload' % len(missing),
-                    details=['%s' % thread.exception for thread in missing])
+                raise ClientError('%s blocks failed to upload' % len(missing))
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():
                 thread.join()
             raise
+        self._cb_next()
 
         r = self.object_put(
             obj,
@@ -664,6 +660,8 @@ class PithosClient(PithosRestClient):
     def _dump_blocks_sync(
             self, obj, remote_hashes, blocksize, total_size, dst, crange,
             **args):
+        if not total_size:
+            return
         for blockid, blockhash in enumerate(remote_hashes):
             if blockhash:
                 start = blocksize * blockid
@@ -686,7 +684,7 @@ class PithosClient(PithosRestClient):
 
     def _hash_from_file(self, fp, start, size, blockhash):
         fp.seek(start)
-        block = fp.read(size)
+        block = readall(fp, size)
         h = newhashlib(blockhash)
         h.update(block.strip('\x00'))
         return hexlify(h.digest())
@@ -729,21 +727,23 @@ class PithosClient(PithosRestClient):
             self._cb_next(len(blockids) - len(unsaved))
             if unsaved:
                 key = unsaved[0]
-                if key:
-                    self._watch_thread_limit(flying.values())
-                    self._thread2file(
-                        flying, blockid_dict, local_file, offset,
-                        **restargs)
-                    end = total_size - 1 if (
-                        key + blocksize > total_size) else key + blocksize - 1
-                    data_range = _range_up(key, end, total_size, filerange)
-                    if not data_range:
-                        self._cb_next()
-                        continue
-                    restargs[
-                        'async_headers'] = {'Range': 'bytes=%s' % data_range}
-                    flying[key] = self._get_block_async(obj, **restargs)
-                    blockid_dict[key] = unsaved
+                self._watch_thread_limit(flying.values())
+                self._thread2file(
+                    flying, blockid_dict, local_file, offset,
+                    **restargs)
+                end = total_size - 1 if (
+                    key + blocksize > total_size) else key + blocksize - 1
+                if end < key:
+                    self._cb_next()
+                    continue
+                data_range = _range_up(key, end, total_size, filerange)
+                if not data_range:
+                    self._cb_next()
+                    continue
+                restargs[
+                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
+                flying[key] = self._get_block_async(obj, **restargs)
+                blockid_dict[key] = unsaved
 
         for thread in flying.values():
             thread.join()
@@ -1104,7 +1104,7 @@ class PithosClient(PithosRestClient):
         finally:
             self.container = cnt_back_up
 
-    def get_container_info(self, until=None):
+    def get_container_info(self, container=None, until=None):
         """
         :param until: (str) formated date
 
@@ -1112,11 +1112,16 @@ class PithosClient(PithosRestClient):
 
         :raises ClientError: 404 Container not found
         """
+        bck_cont = self.container
         try:
+            self.container = container or bck_cont
+            self._assert_container()
             r = self.container_head(until=until)
         except ClientError as err:
             err.details.append('for container %s' % self.container)
             raise err
+        finally:
+            self.container = bck_cont
         return r.headers
 
     def get_container_meta(self, until=None):
@@ -1126,8 +1131,7 @@ class PithosClient(PithosRestClient):
         :returns: (dict)
         """
         return filter_in(
-            self.get_container_info(until=until),
-            'X-Container-Meta')
+            self.get_container_info(until=until), 'X-Container-Meta')
 
     def get_container_object_meta(self, until=None):
         """
@@ -1136,8 +1140,7 @@ class PithosClient(PithosRestClient):
         :returns: (dict)
         """
         return filter_in(
-            self.get_container_info(until=until),
-            'X-Container-Object-Meta')
+            self.get_container_info(until=until), 'X-Container-Object-Meta')
 
     def set_container_meta(self, metapairs):
         """
@@ -1354,6 +1357,7 @@ class PithosClient(PithosRestClient):
         finally:
             from time import sleep
             sleep(2 * len(activethreads()))
+            self._cb_next()
         return headers.values()
 
     def truncate_object(self, obj, upto_bytes):
@@ -1364,17 +1368,21 @@ class PithosClient(PithosRestClient):
 
         :returns: (dict) response headers
         """
+        ctype = self.get_object_info(obj)['content-type']
         r = self.object_post(
             obj,
             update=True,
             content_range='bytes 0-%s/*' % upto_bytes,
-            content_type='application/octet-stream',
+            content_type=ctype,
             object_bytes=upto_bytes,
             source_object=path4url(self.container, obj))
         return r.headers
 
-    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
+    def overwrite_object(
+            self, obj, start, end, source_file,
+            source_version=None, upload_cb=None):
         """Overwrite a part of an object from local source file
+        ATTENTION: content_type must always be application/octet-stream
 
         :param obj: (str) remote object path
 
@@ -1387,21 +1395,16 @@ class PithosClient(PithosRestClient):
         :param upload_db: progress.bar for uploading
         """
 
-        r = self.get_object_info(obj)
-        rf_size = int(r['content-length'])
-        if rf_size < int(start):
-            raise ClientError(
-                'Range start exceeds file size',
-                status=416)
-        elif rf_size < int(end):
-            raise ClientError(
-                'Range end exceeds file size',
-                status=416)
         self._assert_container()
+        r = self.get_object_info(obj, version=source_version)
+        rf_size = int(r['content-length'])
+        start, end = int(start), int(end)
+        assert rf_size >= start, 'Range start %s exceeds file size %s' % (
+            start, rf_size)
         meta = self.get_container_info()
         blocksize = int(meta['x-container-block-size'])
         filesize = fstat(source_file.fileno()).st_size
-        datasize = int(end) - int(start) + 1
+        datasize = end - start + 1
         nblocks = 1 + (datasize - 1) // blocksize
         offset = 0
         if upload_cb:
@@ -1419,11 +1422,12 @@ class PithosClient(PithosRestClient):
                 content_range='bytes %s-%s/*' % (
                     start + offset,
                     start + offset + len(block) - 1),
+                source_version=source_version,
                 data=block)
             headers.append(dict(r.headers))
             offset += len(block)
-
-            self._cb_next
+            self._cb_next()
+        self._cb_next()
         return headers
 
     def copy_object(