return r.headers
# upload_* auxiliary methods
- def _put_block_async(self, data, hash, upload_gen=None):
+ def _put_block_async(self, data, hash):
event = SilentEvent(method=self._put_block, data=data, hash=hash)
event.start()
return event
nblocks = 1 + (size - 1) // blocksize
return (blocksize, blockhash, size, nblocks)
- def _create_or_get_missing_hashes(
+ def _create_object_or_get_missing_hashes(
self, obj, json,
size=None,
format='json',
offset, bytes = hmap[hash]
fileobj.seek(offset)
data = fileobj.read(bytes)
- r = self._put_block_async(data, hash, upload_gen)
+ r = self._put_block_async(data, hash)
flying.append(r)
unfinished = self._watch_thread_limit(flying)
for thread in set(flying).difference(unfinished):
"""
self._assert_container()
- #init
block_info = (
blocksize, blockhash, size, nblocks) = self._get_file_block_info(
f, size, container_info_cache)
hash_cb=hash_cb)
hashmap = dict(bytes=size, hashes=hashes)
- missing, obj_headers = self._create_or_get_missing_hashes(
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
obj, hashmap,
content_type=content_type,
size=size,
else:
break
if missing:
- raise ClientError(
- '%s blocks failed to upload' % len(missing),
- status=800)
+ raise ClientError('%s blocks failed to upload' % len(missing))
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+ raise
+
+ r = self.object_put(
+ obj,
+ format='json',
+ hashmap=True,
+ content_type=content_type,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ etag=etag,
+ json=hashmap,
+ permissions=sharing,
+ public=public,
+ success=201)
+ return r.headers
+
+
+ def upload_from_string(
+ self, obj, input_str,
+ hash_cb=None,
+ upload_cb=None,
+ etag=None,
+ if_etag_match=None,
+ if_not_exist=None,
+ content_encoding=None,
+ content_disposition=None,
+ content_type=None,
+ sharing=None,
+ public=None,
+ container_info_cache=None):
+ """Upload an object using multiple connections (threads)
+
+ :param obj: (str) remote object path
+
+ :param input_str: (str) upload content
+
+ :param hash_cb: optional progress.bar object for calculating hashes
+
+ :param upload_cb: optional progress.bar object for uploading
+
+ :param etag: (str)
+
+ :param if_etag_match: (str) Push that value to if-match header at file
+ creation
+
+ :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+ it does not exist remotely, otherwise the operation will fail.
+ Involves the case of an object with the same path is created while
+ the object is being uploaded.
+
+ :param content_encoding: (str)
+
+ :param content_disposition: (str)
+
+ :param content_type: (str)
+
+ :param sharing: {'read':[user and/or grp names],
+ 'write':[usr and/or grp names]}
+
+ :param public: (bool)
+
+ :param container_info_cache: (dict) if given, avoid redundant calls to
+ server for container info (block size and hash information)
+ """
+ self._assert_container()
+
+ blocksize, blockhash, size, nblocks = self._get_file_block_info(
+ fileobj=None, size=len(input_str), cache=container_info_cache)
+ (hashes, hmap, offset) = ([], {}, 0)
+ if not content_type:
+ content_type = 'application/octet-stream'
+
+ num_of_blocks, blockmod = size / blocksize, size % blocksize
+ num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
+
+ hashes = {}
+ hmap = {}
+ for blockid in range(num_of_blocks):
+ start = blockid * blocksize
+ block = input_str[start: (start + blocksize)]
+ hashes[blockid] = _pithos_hash(block, blockhash)
+ hmap[hashes[blockid]] = (start, block)
+
+ hashmap = dict(bytes=size, hashes=hashes)
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
+ obj, hashmap,
+ content_type=content_type,
+ size=size,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ content_encoding=content_encoding,
+ content_disposition=content_disposition,
+ permissions=sharing,
+ public=public)
+ if missing is None:
+ return obj_headers
+ num_of_missing = len(missing)
+
+ if upload_cb:
+ self.progress_bar_gen = upload_cb(num_of_blocks)
+ for i in range(num_of_blocks + 1 - num_of_missing):
+ self._cb_next()
+
+ try:
+ flying = []
+ for hash in missing:
+ offset, block = hmap[hash]
+ bird = self._put_block_async(block, hash)
+ flying.append(bird)
+ unfinished = self._watch_thread_limit(flying)
+ for thread in set(flying).difference(unfinished):
+ if thread.exception:
+ raise thread.exception
+ if thread.isAlive():
+ flying.append(thread)
+ else:
+ self._cb_next()
+ flying = unfinished
+ for thread in flying:
+ thread.join()
+ if thread.exception:
+ raise thread.exception
+ self._cb_next()
+
except KeyboardInterrupt:
sendlog.info('- - - wait for threads to finish')
for thread in activethreads():