Implement upload_from_string
authorStavros Sachtouris <saxtouri@admin.grnet.gr>
Thu, 16 May 2013 16:27:54 +0000 (19:27 +0300)
committerStavros Sachtouris <saxtouri@admin.grnet.gr>
Thu, 16 May 2013 16:27:54 +0000 (19:27 +0300)
Implemented with threads

Refs: #3608

kamaki/clients/livetest/pithos.py
kamaki/clients/pithos/__init__.py

index d6acc29..da29d93 100644 (file)
@@ -690,6 +690,16 @@ class Pithos(livetest.Generic):
                 range_str='%s-%s' % (pos, (pos + 128)))
             self.assertEqual(tmp_s, src_f.read(len(tmp_s)))
 
+        print('\tUploading KiBs as strings...')
+        trg_fname = 'fromString_%s' % self.now
+        src_size = 2 * 1024
+        src_f.seek(0)
+        src_str = src_f.read(src_size)
+        self.client.upload_from_string(trg_fname, src_str)
+        print('\tDownload as string and check...')
+        tmp_s = self.client.download_to_string(trg_fname)
+        self.assertEqual(tmp_s, src_str)
+
         """Upload a boring file"""
         trg_fname = 'boringfile_%s' % self.now
         src_f = self.create_boring_file(42)
index 74dd913..e4c85aa 100644 (file)
@@ -182,7 +182,7 @@ class PithosClient(PithosRestClient):
         return r.headers
 
     # upload_* auxiliary methods
-    def _put_block_async(self, data, hash, upload_gen=None):
+    def _put_block_async(self, data, hash):
         event = SilentEvent(method=self._put_block, data=data, hash=hash)
         event.start()
         return event
@@ -219,7 +219,7 @@ class PithosClient(PithosRestClient):
         nblocks = 1 + (size - 1) // blocksize
         return (blocksize, blockhash, size, nblocks)
 
-    def _create_or_get_missing_hashes(
+    def _create_object_or_get_missing_hashes(
             self, obj, json,
             size=None,
             format='json',
@@ -279,7 +279,7 @@ class PithosClient(PithosRestClient):
             offset, bytes = hmap[hash]
             fileobj.seek(offset)
             data = fileobj.read(bytes)
-            r = self._put_block_async(data, hash, upload_gen)
+            r = self._put_block_async(data, hash)
             flying.append(r)
             unfinished = self._watch_thread_limit(flying)
             for thread in set(flying).difference(unfinished):
@@ -360,7 +360,6 @@ class PithosClient(PithosRestClient):
         """
         self._assert_container()
 
-        #init
         block_info = (
             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
                 f, size, container_info_cache)
@@ -376,7 +375,7 @@ class PithosClient(PithosRestClient):
             hash_cb=hash_cb)
 
         hashmap = dict(bytes=size, hashes=hashes)
-        missing, obj_headers = self._create_or_get_missing_hashes(
+        missing, obj_headers = self._create_object_or_get_missing_hashes(
             obj, hashmap,
             content_type=content_type,
             size=size,
@@ -418,9 +417,135 @@ class PithosClient(PithosRestClient):
                 else:
                     break
             if missing:
-                raise ClientError(
-                    '%s blocks failed to upload' % len(missing),
-                    status=800)
+                raise ClientError('%s blocks failed to upload' % len(missing))
+        except KeyboardInterrupt:
+            sendlog.info('- - - wait for threads to finish')
+            for thread in activethreads():
+                thread.join()
+            raise
+
+        r = self.object_put(
+            obj,
+            format='json',
+            hashmap=True,
+            content_type=content_type,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
+            etag=etag,
+            json=hashmap,
+            permissions=sharing,
+            public=public,
+            success=201)
+        return r.headers
+
+
+    def upload_from_string(
+            self, obj, input_str,
+            hash_cb=None,
+            upload_cb=None,
+            etag=None,
+            if_etag_match=None,
+            if_not_exist=None,
+            content_encoding=None,
+            content_disposition=None,
+            content_type=None,
+            sharing=None,
+            public=None,
+            container_info_cache=None):
+        """Upload an object using multiple connections (threads)
+
+        :param obj: (str) remote object path
+
+        :param input_str: (str) upload content
+
+        :param hash_cb: optional progress.bar object for calculating hashes
+
+        :param upload_cb: optional progress.bar object for uploading
+
+        :param etag: (str)
+
+        :param if_etag_match: (str) Push that value to if-match header at file
+            creation
+
+        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+            it does not exist remotely, otherwise the operation will fail.
+            Involves the case of an object with the same path is created while
+            the object is being uploaded.
+
+        :param content_encoding: (str)
+
+        :param content_disposition: (str)
+
+        :param content_type: (str)
+
+        :param sharing: {'read':[user and/or grp names],
+            'write':[usr and/or grp names]}
+
+        :param public: (bool)
+
+        :param container_info_cache: (dict) if given, avoid redundant calls to
+        server for container info (block size and hash information)
+        """
+        self._assert_container()
+
+        blocksize, blockhash, size, nblocks = self._get_file_block_info(
+                fileobj=None, size=len(input_str), cache=container_info_cache)
+        (hashes, hmap, offset) = ([], {}, 0)
+        if not content_type:
+            content_type = 'application/octet-stream'
+
+        num_of_blocks, blockmod = size / blocksize, size % blocksize
+        num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
+
+        hashes = {}
+        hmap = {}
+        for blockid in range(num_of_blocks):
+            start = blockid * blocksize
+            block = input_str[start: (start + blocksize)]
+            hashes[blockid] = _pithos_hash(block, blockhash)
+            hmap[hashes[blockid]] = (start, block)
+
+        hashmap = dict(bytes=size, hashes=hashes)
+        missing, obj_headers = self._create_object_or_get_missing_hashes(
+            obj, hashmap,
+            content_type=content_type,
+            size=size,
+            if_etag_match=if_etag_match,
+            if_etag_not_match='*' if if_not_exist else None,
+            content_encoding=content_encoding,
+            content_disposition=content_disposition,
+            permissions=sharing,
+            public=public)
+        if missing is None:
+            return obj_headers
+        num_of_missing = len(missing)
+
+        if upload_cb:
+            self.progress_bar_gen = upload_cb(num_of_blocks)
+            for i in range(num_of_blocks + 1 - num_of_missing):
+                self._cb_next()
+
+        try:
+            flying = []
+            for hash in missing:
+                offset, block = hmap[hash]
+                bird = self._put_block_async(block, hash)
+                flying.append(bird)
+                unfinished = self._watch_thread_limit(flying)
+                for thread in set(flying).difference(unfinished):
+                    if thread.exception:
+                        raise thread.exception
+                    if thread.isAlive():
+                        flying.append(thread)
+                    else:
+                        self._cb_next()
+                flying = unfinished
+            for thread in flying:
+                thread.join()
+                if thread.exception:
+                    raise thread.exception
+                self._cb_next()
+
         except KeyboardInterrupt:
             sendlog.info('- - - wait for threads to finish')
             for thread in activethreads():