+ permissions=sharing,
+ public=public,
+ success=201)
+ return r.headers
+
+ def upload_from_string(
+ self, obj, input_str,
+ hash_cb=None,
+ upload_cb=None,
+ etag=None,
+ if_etag_match=None,
+ if_not_exist=None,
+ content_encoding=None,
+ content_disposition=None,
+ content_type=None,
+ sharing=None,
+ public=None,
+ container_info_cache=None):
+ """Upload an object using multiple connections (threads)
+
+ :param obj: (str) remote object path
+
+ :param input_str: (str) upload content
+
+ :param hash_cb: optional progress.bar object for calculating hashes
+
+ :param upload_cb: optional progress.bar object for uploading
+
+ :param etag: (str)
+
+ :param if_etag_match: (str) Push that value to if-match header at file
+ creation
+
+ :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
+ it does not exist remotely, otherwise the operation will fail.
+ Involves the case of an object with the same path is created while
+ the object is being uploaded.
+
+ :param content_encoding: (str)
+
+ :param content_disposition: (str)
+
+ :param content_type: (str)
+
+ :param sharing: {'read':[user and/or grp names],
+ 'write':[usr and/or grp names]}
+
+ :param public: (bool)
+
+ :param container_info_cache: (dict) if given, avoid redundant calls to
+ server for container info (block size and hash information)
+ """
+ self._assert_container()
+
+ blocksize, blockhash, size, nblocks = self._get_file_block_info(
+ fileobj=None, size=len(input_str), cache=container_info_cache)
+ (hashes, hmap, offset) = ([], {}, 0)
+ if not content_type:
+ content_type = 'application/octet-stream'
+
+ hashes = []
+ hmap = {}
+ for blockid in range(nblocks):
+ start = blockid * blocksize
+ block = input_str[start: (start + blocksize)]
+ hashes.append(_pithos_hash(block, blockhash))
+ hmap[hashes[blockid]] = (start, block)
+
+ hashmap = dict(bytes=size, hashes=hashes)
+ missing, obj_headers = self._create_object_or_get_missing_hashes(
+ obj, hashmap,
+ content_type=content_type,
+ size=size,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ content_encoding=content_encoding,
+ content_disposition=content_disposition,
+ permissions=sharing,
+ public=public)
+ if missing is None:
+ return obj_headers
+ num_of_missing = len(missing)
+
+ if upload_cb:
+ self.progress_bar_gen = upload_cb(nblocks)
+ for i in range(nblocks + 1 - num_of_missing):
+ self._cb_next()
+
+ tries = 7
+ old_failures = 0
+ try:
+ while tries and missing:
+ flying = []
+ failures = []
+ for hash in missing:
+ offset, block = hmap[hash]
+ bird = self._put_block_async(block, hash)
+ flying.append(bird)
+ unfinished = self._watch_thread_limit(flying)
+ for thread in set(flying).difference(unfinished):
+ if thread.exception:
+ failures.append(thread.kwargs['hash'])
+ if thread.isAlive():
+ flying.append(thread)
+ else:
+ self._cb_next()
+ flying = unfinished
+ for thread in flying:
+ thread.join()
+ if thread.exception:
+ failures.append(thread.kwargs['hash'])
+ self._cb_next()
+ missing = failures
+ if missing and len(missing) == old_failures:
+ tries -= 1
+ old_failures = len(missing)
+ if missing:
+ raise ClientError(
+ '%s blocks failed to upload' % len(missing),
+ details=['%s' % thread.exception for thread in missing])
+ except KeyboardInterrupt:
+ sendlog.info('- - - wait for threads to finish')
+ for thread in activethreads():
+ thread.join()
+ raise
+
+ r = self.object_put(
+ obj,
+ format='json',
+ hashmap=True,
+ content_type=content_type,
+ if_etag_match=if_etag_match,
+ if_etag_not_match='*' if if_not_exist else None,
+ etag=etag,
+ json=hashmap,
+ permissions=sharing,
+ public=public,