1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, max_value, a_range):
57 :param start: (int) the window bottom
59 :param end: (int) the window top
61 :param max_value: (int) maximum accepted value
63 :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64 where X: x|x-y|-x where x < y and x, y natural numbers
66 :returns: (str) a range string cut-off for the start-end range
67 an empty response means this window is out of range
69 assert start >= 0, '_range_up called w. start(%s) < 0' % start
70 assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
72 assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
75 return '%s-%s' % (start, end)
77 for some_range in a_range.split(','):
78 v0, sep, v1 = some_range.partition('-')
83 if v1 < start or v0 > end or v1 < v0:
85 v0 = v0 if v0 > start else start
86 v1 = v1 if v1 < end else end
87 selected.append('%s-%s' % (v0, v1))
91 v1 = v0 if v0 <= end else end
92 selected.append('%s-%s' % (start, v1))
95 if max_value - v1 > end:
97 v0 = (max_value - v1) if max_value - v1 > start else start
98 selected.append('%s-%s' % (v0, end))
99 return ','.join(selected)
102 class PithosClient(PithosRestClient):
103 """Synnefo Pithos+ API client"""
105 def __init__(self, base_url, token, account=None, container=None):
106 super(PithosClient, self).__init__(base_url, token, account, container)
108 def create_container(
110 container=None, sizelimit=None, versioning=None, metadata=None,
113 :param container: (str) if not given, self.container is used instead
115 :param sizelimit: (int) container total size limit in bytes
117 :param versioning: (str) can be auto or whatever supported by server
119 :param metadata: (dict) Custom user-defined metadata of the form
120 { 'name1': 'value1', 'name2': 'value2', ... }
122 :returns: (dict) response headers
124 cnt_back_up = self.container
126 self.container = container or cnt_back_up
127 r = self.container_put(
128 quota=sizelimit, versioning=versioning, metadata=metadata,
132 self.container = cnt_back_up
134 def purge_container(self, container=None):
135 """Delete an empty container and destroy associated blocks
137 cnt_back_up = self.container
139 self.container = container or cnt_back_up
140 r = self.container_delete(until=unicode(time()))
142 self.container = cnt_back_up
145 def upload_object_unchunked(
150 content_encoding=None,
151 content_disposition=None,
156 :param obj: (str) remote object path
158 :param f: open file descriptor
160 :param withHashFile: (bool)
162 :param size: (int) size of data to upload
166 :param content_encoding: (str)
168 :param content_disposition: (str)
170 :param content_type: (str)
172 :param sharing: {'read':[user and/or grp names],
173 'write':[usr and/or grp names]}
175 :param public: (bool)
177 :returns: (dict) created object metadata
179 self._assert_container()
185 data = json.dumps(json.loads(data))
187 raise ClientError('"%s" is not json-formated' % f.name, 1)
189 msg = '"%s" is not a valid hashmap file' % f.name
190 raise ClientError(msg, 1)
193 data = readall(f, size) if size else f.read()
198 content_encoding=content_encoding,
199 content_disposition=content_disposition,
200 content_type=content_type,
206 def create_object_by_manifestation(
209 content_encoding=None,
210 content_disposition=None,
215 :param obj: (str) remote object path
219 :param content_encoding: (str)
221 :param content_disposition: (str)
223 :param content_type: (str)
225 :param sharing: {'read':[user and/or grp names],
226 'write':[usr and/or grp names]}
228 :param public: (bool)
230 :returns: (dict) created object metadata
232 self._assert_container()
237 content_encoding=content_encoding,
238 content_disposition=content_disposition,
239 content_type=content_type,
242 manifest='%s/%s' % (self.container, obj))
245 # upload_* auxiliary methods
246 def _put_block_async(self, data, hash):
247 event = SilentEvent(method=self._put_block, data=data, hash=hash)
251 def _put_block(self, data, hash):
252 r = self.container_post(
254 content_type='application/octet-stream',
255 content_length=len(data),
258 assert r.json[0] == hash, 'Local hash does not match server'
260 def _get_file_block_info(self, fileobj, size=None, cache=None):
262 :param fileobj: (file descriptor) source
264 :param size: (int) size of data to upload from source
266 :param cache: (dict) if provided, cache container info response to
267 avoid redundant calls
269 if isinstance(cache, dict):
271 meta = cache[self.container]
273 meta = self.get_container_info()
274 cache[self.container] = meta
276 meta = self.get_container_info()
277 blocksize = int(meta['x-container-block-size'])
278 blockhash = meta['x-container-block-hash']
279 size = size if size is not None else fstat(fileobj.fileno()).st_size
280 nblocks = 1 + (size - 1) // blocksize
281 return (blocksize, blockhash, size, nblocks)
283 def _create_object_or_get_missing_hashes(
290 if_etag_not_match=None,
291 content_encoding=None,
292 content_disposition=None,
300 content_type=content_type,
302 if_etag_match=if_etag_match,
303 if_etag_not_match=if_etag_not_match,
304 content_encoding=content_encoding,
305 content_disposition=content_disposition,
306 permissions=permissions,
309 return (None if r.status_code == 201 else r.json), r.headers
311 def _calculate_blocks_for_upload(
312 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
316 hash_gen = hash_cb(nblocks)
319 for i in range(nblocks):
320 block = readall(fileobj, min(blocksize, size - offset))
322 hash = _pithos_hash(block, blockhash)
324 hmap[hash] = (offset, bytes)
328 msg = ('Failed to calculate uploaded blocks:'
329 ' Offset and object size do not match')
330 assert offset == size, msg
332 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
333 """upload missing blocks asynchronously"""
335 self._init_thread_limit()
340 offset, bytes = hmap[hash]
342 data = readall(fileobj, bytes)
343 r = self._put_block_async(data, hash)
345 unfinished = self._watch_thread_limit(flying)
346 for thread in set(flying).difference(unfinished):
348 failures.append(thread)
351 ClientError) and thread.exception.status == 502:
352 self.POOLSIZE = self._thread_limit
353 elif thread.isAlive():
354 flying.append(thread)
362 for thread in flying:
365 failures.append(thread)
372 return [failure.kwargs['hash'] for failure in failures]
382 content_encoding=None,
383 content_disposition=None,
387 container_info_cache=None):
388 """Upload an object using multiple connections (threads)
390 :param obj: (str) remote object path
392 :param f: open file descriptor (rb)
394 :param hash_cb: optional progress.bar object for calculating hashes
396 :param upload_cb: optional progress.bar object for uploading
400 :param if_etag_match: (str) Push that value to if-match header at file
403 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
404 it does not exist remotely, otherwise the operation will fail.
405 Involves the case of an object with the same path is created while
406 the object is being uploaded.
408 :param content_encoding: (str)
410 :param content_disposition: (str)
412 :param content_type: (str)
414 :param sharing: {'read':[user and/or grp names],
415 'write':[usr and/or grp names]}
417 :param public: (bool)
419 :param container_info_cache: (dict) if given, avoid redundant calls to
420 server for container info (block size and hash information)
422 self._assert_container()
425 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
426 f, size, container_info_cache)
427 (hashes, hmap, offset) = ([], {}, 0)
429 content_type = 'application/octet-stream'
431 self._calculate_blocks_for_upload(
438 hashmap = dict(bytes=size, hashes=hashes)
439 missing, obj_headers = self._create_object_or_get_missing_hashes(
441 content_type=content_type,
443 if_etag_match=if_etag_match,
444 if_etag_not_match='*' if if_not_exist else None,
445 content_encoding=content_encoding,
446 content_disposition=content_disposition,
454 upload_gen = upload_cb(len(missing))
455 for i in range(len(missing), len(hashmap['hashes']) + 1):
466 sendlog.info('%s blocks missing' % len(missing))
467 num_of_blocks = len(missing)
468 missing = self._upload_missing_blocks(
474 if num_of_blocks == len(missing):
477 num_of_blocks = len(missing)
482 details = ['%s' % thread.exception for thread in missing]
484 details = ['Also, failed to read thread exceptions']
486 '%s blocks failed to upload' % len(missing),
488 except KeyboardInterrupt:
489 sendlog.info('- - - wait for threads to finish')
490 for thread in activethreads():
498 content_type=content_type,
499 content_encoding=content_encoding,
500 if_etag_match=if_etag_match,
501 if_etag_not_match='*' if if_not_exist else None,
509 def upload_from_string(
510 self, obj, input_str,
516 content_encoding=None,
517 content_disposition=None,
521 container_info_cache=None):
522 """Upload an object using multiple connections (threads)
524 :param obj: (str) remote object path
526 :param input_str: (str) upload content
528 :param hash_cb: optional progress.bar object for calculating hashes
530 :param upload_cb: optional progress.bar object for uploading
534 :param if_etag_match: (str) Push that value to if-match header at file
537 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
538 it does not exist remotely, otherwise the operation will fail.
539 Involves the case of an object with the same path is created while
540 the object is being uploaded.
542 :param content_encoding: (str)
544 :param content_disposition: (str)
546 :param content_type: (str)
548 :param sharing: {'read':[user and/or grp names],
549 'write':[usr and/or grp names]}
551 :param public: (bool)
553 :param container_info_cache: (dict) if given, avoid redundant calls to
554 server for container info (block size and hash information)
556 self._assert_container()
558 blocksize, blockhash, size, nblocks = self._get_file_block_info(
559 fileobj=None, size=len(input_str), cache=container_info_cache)
560 (hashes, hmap, offset) = ([], {}, 0)
562 content_type = 'application/octet-stream'
566 for blockid in range(nblocks):
567 start = blockid * blocksize
568 block = input_str[start: (start + blocksize)]
569 hashes.append(_pithos_hash(block, blockhash))
570 hmap[hashes[blockid]] = (start, block)
572 hashmap = dict(bytes=size, hashes=hashes)
573 missing, obj_headers = self._create_object_or_get_missing_hashes(
575 content_type=content_type,
577 if_etag_match=if_etag_match,
578 if_etag_not_match='*' if if_not_exist else None,
579 content_encoding=content_encoding,
580 content_disposition=content_disposition,
585 num_of_missing = len(missing)
588 self.progress_bar_gen = upload_cb(nblocks)
589 for i in range(nblocks + 1 - num_of_missing):
595 while tries and missing:
599 offset, block = hmap[hash]
600 bird = self._put_block_async(block, hash)
602 unfinished = self._watch_thread_limit(flying)
603 for thread in set(flying).difference(unfinished):
605 failures.append(thread.kwargs['hash'])
607 flying.append(thread)
611 for thread in flying:
614 failures.append(thread.kwargs['hash'])
617 if missing and len(missing) == old_failures:
619 old_failures = len(missing)
622 '%s blocks failed to upload' % len(missing),
623 details=['%s' % thread.exception for thread in missing])
624 except KeyboardInterrupt:
625 sendlog.info('- - - wait for threads to finish')
626 for thread in activethreads():
634 content_type=content_type,
635 content_encoding=content_encoding,
636 if_etag_match=if_etag_match,
637 if_etag_not_match='*' if if_not_exist else None,
645 # download_* auxiliary methods
646 def _get_remote_blocks_info(self, obj, **restargs):
647 #retrieve object hashmap
648 myrange = restargs.pop('data_range', None)
649 hashmap = self.get_object_hashmap(obj, **restargs)
650 restargs['data_range'] = myrange
651 blocksize = int(hashmap['block_size'])
652 blockhash = hashmap['block_hash']
653 total_size = hashmap['bytes']
654 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
656 for i, h in enumerate(hashmap['hashes']):
657 # map_dict[h] = i CHAGE
659 map_dict[h].append(i)
662 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
664 def _dump_blocks_sync(
665 self, obj, remote_hashes, blocksize, total_size, dst, crange,
669 for blockid, blockhash in enumerate(remote_hashes):
671 start = blocksize * blockid
672 is_last = start + blocksize > total_size
673 end = (total_size - 1) if is_last else (start + blocksize - 1)
674 data_range = _range_up(start, end, total_size, crange)
678 args['data_range'] = 'bytes=%s' % data_range
679 r = self.object_get(obj, success=(200, 206), **args)
684 def _get_block_async(self, obj, **args):
685 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
689 def _hash_from_file(self, fp, start, size, blockhash):
691 block = readall(fp, size)
692 h = newhashlib(blockhash)
693 h.update(block.strip('\x00'))
694 return hexlify(h.digest())
696 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
697 """write the results of a greenleted rest call to a file
699 :param offset: the offset of the file up to blocksize
700 - e.g. if the range is 10-100, all blocks will be written to
703 for key, g in flying.items():
708 block = g.value.content
709 for block_start in blockids[key]:
710 local_file.seek(block_start + offset)
711 local_file.write(block)
717 def _dump_blocks_async(
718 self, obj, remote_hashes, blocksize, total_size, local_file,
719 blockhash=None, resume=False, filerange=None, **restargs):
720 file_size = fstat(local_file.fileno()).st_size if resume else 0
722 blockid_dict = dict()
725 self._init_thread_limit()
726 for block_hash, blockids in remote_hashes.items():
727 blockids = [blk * blocksize for blk in blockids]
728 unsaved = [blk for blk in blockids if not (
729 blk < file_size and block_hash == self._hash_from_file(
730 local_file, blk, blocksize, blockhash))]
731 self._cb_next(len(blockids) - len(unsaved))
734 self._watch_thread_limit(flying.values())
736 flying, blockid_dict, local_file, offset,
738 end = total_size - 1 if (
739 key + blocksize > total_size) else key + blocksize - 1
743 data_range = _range_up(key, end, total_size, filerange)
748 'async_headers'] = {'Range': 'bytes=%s' % data_range}
749 flying[key] = self._get_block_async(obj, **restargs)
750 blockid_dict[key] = unsaved
752 for thread in flying.values():
754 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
764 if_modified_since=None,
765 if_unmodified_since=None):
766 """Download an object (multiple connections, random blocks)
768 :param obj: (str) remote object path
770 :param dst: open file descriptor (wb+)
772 :param download_cb: optional progress.bar object for downloading
774 :param version: (str) file version
776 :param resume: (bool) if set, preserve already downloaded file parts
778 :param range_str: (str) from, to are file positions (int) in bytes
780 :param if_match: (str)
782 :param if_none_match: (str)
784 :param if_modified_since: (str) formated date
786 :param if_unmodified_since: (str) formated date"""
789 data_range=None if range_str is None else 'bytes=%s' % range_str,
791 if_none_match=if_none_match,
792 if_modified_since=if_modified_since,
793 if_unmodified_since=if_unmodified_since)
800 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
801 assert total_size >= 0
804 self.progress_bar_gen = download_cb(len(hash_list))
808 self._dump_blocks_sync(
817 self._dump_blocks_async(
828 dst.truncate(total_size)
832 def download_to_string(
839 if_modified_since=None,
840 if_unmodified_since=None):
841 """Download an object to a string (multiple connections). This method
842 uses threads for http requests, but stores all content in memory.
844 :param obj: (str) remote object path
846 :param download_cb: optional progress.bar object for downloading
848 :param version: (str) file version
850 :param range_str: (str) from, to are file positions (int) in bytes
852 :param if_match: (str)
854 :param if_none_match: (str)
856 :param if_modified_since: (str) formated date
858 :param if_unmodified_since: (str) formated date
860 :returns: (str) the whole object contents
864 data_range=None if range_str is None else 'bytes=%s' % range_str,
866 if_none_match=if_none_match,
867 if_modified_since=if_modified_since,
868 if_unmodified_since=if_unmodified_since)
875 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
876 assert total_size >= 0
879 self.progress_bar_gen = download_cb(len(hash_list))
882 num_of_blocks = len(remote_hashes)
883 ret = [''] * num_of_blocks
884 self._init_thread_limit()
887 for blockid, blockhash in enumerate(remote_hashes):
888 start = blocksize * blockid
889 is_last = start + blocksize > total_size
890 end = (total_size - 1) if is_last else (start + blocksize - 1)
891 data_range_str = _range_up(start, end, end, range_str)
893 self._watch_thread_limit(flying.values())
894 restargs['data_range'] = 'bytes=%s' % data_range_str
895 flying[blockid] = self._get_block_async(obj, **restargs)
896 for runid, thread in flying.items():
897 if (blockid + 1) == num_of_blocks:
899 elif thread.isAlive():
902 raise thread.exception
903 ret[runid] = thread.value.content
907 except KeyboardInterrupt:
908 sendlog.info('- - - wait for threads to finish')
909 for thread in activethreads():
912 #Command Progress Bar method
913 def _cb_next(self, step=1):
914 if hasattr(self, 'progress_bar_gen'):
916 for i in xrange(step):
917 self.progress_bar_gen.next()
921 def _complete_cb(self):
924 self.progress_bar_gen.next()
928 def get_object_hashmap(
933 if_modified_since=None,
934 if_unmodified_since=None):
936 :param obj: (str) remote object path
938 :param if_match: (str)
940 :param if_none_match: (str)
942 :param if_modified_since: (str) formated date
944 :param if_unmodified_since: (str) formated date
953 if_etag_match=if_match,
954 if_etag_not_match=if_none_match,
955 if_modified_since=if_modified_since,
956 if_unmodified_since=if_unmodified_since)
957 except ClientError as err:
958 if err.status == 304 or err.status == 412:
963 def set_account_group(self, group, usernames):
967 :param usernames: (list)
969 r = self.account_post(update=True, groups={group: usernames})
972 def del_account_group(self, group):
976 self.account_post(update=True, groups={group: []})
978 def get_account_info(self, until=None):
980 :param until: (str) formated date
984 r = self.account_head(until=until)
985 if r.status_code == 401:
986 raise ClientError("No authorization", status=401)
989 def get_account_quota(self):
994 self.get_account_info(),
995 'X-Account-Policy-Quota',
998 #def get_account_versioning(self):
1003 # self.get_account_info(),
1004 # 'X-Account-Policy-Versioning',
1007 def get_account_meta(self, until=None):
1009 :param until: (str) formated date
1013 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1015 def get_account_group(self):
1019 return filter_in(self.get_account_info(), 'X-Account-Group-')
1021 def set_account_meta(self, metapairs):
1023 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1025 assert(type(metapairs) is dict)
1026 r = self.account_post(update=True, metadata=metapairs)
1029 def del_account_meta(self, metakey):
1031 :param metakey: (str) metadatum key
1033 r = self.account_post(update=True, metadata={metakey: ''})
1036 #def set_account_quota(self, quota):
1038 # :param quota: (int)
1040 # self.account_post(update=True, quota=quota)
1042 #def set_account_versioning(self, versioning):
1044 # :param versioning: (str)
1046 # r = self.account_post(update=True, versioning=versioning)
1049 def list_containers(self):
1053 r = self.account_get()
1056 def del_container(self, until=None, delimiter=None):
1058 :param until: (str) formated date
1060 :param delimiter: (str) with / empty container
1062 :raises ClientError: 404 Container does not exist
1064 :raises ClientError: 409 Container is not empty
1066 self._assert_container()
1067 r = self.container_delete(
1069 delimiter=delimiter,
1070 success=(204, 404, 409))
1071 if r.status_code == 404:
1073 'Container "%s" does not exist' % self.container,
1075 elif r.status_code == 409:
1077 'Container "%s" is not empty' % self.container,
1081 def get_container_versioning(self, container=None):
1083 :param container: (str)
1087 cnt_back_up = self.container
1089 self.container = container or cnt_back_up
1091 self.get_container_info(),
1092 'X-Container-Policy-Versioning')
1094 self.container = cnt_back_up
1096 def get_container_limit(self, container=None):
1098 :param container: (str)
1102 cnt_back_up = self.container
1104 self.container = container or cnt_back_up
1106 self.get_container_info(),
1107 'X-Container-Policy-Quota')
1109 self.container = cnt_back_up
1111 def get_container_info(self, until=None):
1113 :param until: (str) formated date
1117 :raises ClientError: 404 Container not found
1120 r = self.container_head(until=until)
1121 except ClientError as err:
1122 err.details.append('for container %s' % self.container)
1126 def get_container_meta(self, until=None):
1128 :param until: (str) formated date
1133 self.get_container_info(until=until),
1136 def get_container_object_meta(self, until=None):
1138 :param until: (str) formated date
1143 self.get_container_info(until=until),
1144 'X-Container-Object-Meta')
1146 def set_container_meta(self, metapairs):
1148 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1150 assert(type(metapairs) is dict)
1151 r = self.container_post(update=True, metadata=metapairs)
1154 def del_container_meta(self, metakey):
1156 :param metakey: (str) metadatum key
1158 :returns: (dict) response headers
1160 r = self.container_post(update=True, metadata={metakey: ''})
1163 def set_container_limit(self, limit):
1167 r = self.container_post(update=True, quota=limit)
1170 def set_container_versioning(self, versioning):
1172 :param versioning: (str)
1174 r = self.container_post(update=True, versioning=versioning)
1177 def del_object(self, obj, until=None, delimiter=None):
1179 :param obj: (str) remote object path
1181 :param until: (str) formated date
1183 :param delimiter: (str)
1185 self._assert_container()
1186 r = self.object_delete(obj, until=until, delimiter=delimiter)
1189 def set_object_meta(self, obj, metapairs):
1191 :param obj: (str) remote object path
1193 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1195 assert(type(metapairs) is dict)
1196 r = self.object_post(obj, update=True, metadata=metapairs)
1199 def del_object_meta(self, obj, metakey):
1201 :param obj: (str) remote object path
1203 :param metakey: (str) metadatum key
1205 r = self.object_post(obj, update=True, metadata={metakey: ''})
1208 def publish_object(self, obj):
1210 :param obj: (str) remote object path
1212 :returns: (str) access url
1214 self.object_post(obj, update=True, public=True)
1215 info = self.get_object_info(obj)
1216 return info['x-object-public']
1217 pref, sep, rest = self.base_url.partition('//')
1218 base = rest.split('/')[0]
1219 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1221 def unpublish_object(self, obj):
1223 :param obj: (str) remote object path
1225 r = self.object_post(obj, update=True, public=False)
1228 def get_object_info(self, obj, version=None):
1230 :param obj: (str) remote object path
1232 :param version: (str)
1237 r = self.object_head(obj, version=version)
1239 except ClientError as ce:
1240 if ce.status == 404:
1241 raise ClientError('Object %s not found' % obj, status=404)
1244 def get_object_meta(self, obj, version=None):
1246 :param obj: (str) remote object path
1248 :param version: (str)
1253 self.get_object_info(obj, version=version),
1256 def get_object_sharing(self, obj):
1258 :param obj: (str) remote object path
1263 self.get_object_info(obj),
1268 perms = r['x-object-sharing'].split(';')
1273 raise ClientError('Incorrect reply format')
1274 (key, val) = perm.strip().split('=')
1278 def set_object_sharing(
1280 read_permission=False, write_permission=False):
1281 """Give read/write permisions to an object.
1283 :param obj: (str) remote object path
1285 :param read_permission: (list - bool) users and user groups that get
1286 read permission for this object - False means all previous read
1287 permissions will be removed
1289 :param write_permission: (list - bool) of users and user groups to get
1290 write permission for this object - False means all previous write
1291 permissions will be removed
1293 :returns: (dict) response headers
1296 perms = dict(read=read_permission or '', write=write_permission or '')
1297 r = self.object_post(obj, update=True, permissions=perms)
1300 def del_object_sharing(self, obj):
1302 :param obj: (str) remote object path
1304 return self.set_object_sharing(obj)
1306 def append_object(self, obj, source_file, upload_cb=None):
1308 :param obj: (str) remote object path
1310 :param source_file: open file descriptor
1312 :param upload_db: progress.bar for uploading
1314 self._assert_container()
1315 meta = self.get_container_info()
1316 blocksize = int(meta['x-container-block-size'])
1317 filesize = fstat(source_file.fileno()).st_size
1318 nblocks = 1 + (filesize - 1) // blocksize
1322 self.progress_bar_gen = upload_cb(nblocks)
1325 self._init_thread_limit()
1327 for i in range(nblocks):
1328 block = source_file.read(min(blocksize, filesize - offset))
1329 offset += len(block)
1331 self._watch_thread_limit(flying.values())
1333 flying[i] = SilentEvent(
1334 method=self.object_post,
1337 content_range='bytes */*',
1338 content_type='application/octet-stream',
1339 content_length=len(block),
1343 for key, thread in flying.items():
1344 if thread.isAlive():
1346 unfinished[key] = thread
1349 if thread.exception:
1350 raise thread.exception
1351 headers[key] = thread.value.headers
1354 except KeyboardInterrupt:
1355 sendlog.info('- - - wait for threads to finish')
1356 for thread in activethreads():
1359 from time import sleep
1360 sleep(2 * len(activethreads()))
1361 return headers.values()
1363 def truncate_object(self, obj, upto_bytes):
1365 :param obj: (str) remote object path
1367 :param upto_bytes: max number of bytes to leave on file
1369 :returns: (dict) response headers
1371 r = self.object_post(
1374 content_range='bytes 0-%s/*' % upto_bytes,
1375 content_type='application/octet-stream',
1376 object_bytes=upto_bytes,
1377 source_object=path4url(self.container, obj))
1380 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1381 """Overwrite a part of an object from local source file
1383 :param obj: (str) remote object path
1385 :param start: (int) position in bytes to start overwriting from
1387 :param end: (int) position in bytes to stop overwriting at
1389 :param source_file: open file descriptor
1391 :param upload_db: progress.bar for uploading
1394 r = self.get_object_info(obj)
1395 rf_size = int(r['content-length'])
1396 if rf_size < int(start):
1398 'Range start exceeds file size',
1400 elif rf_size < int(end):
1402 'Range end exceeds file size',
1404 self._assert_container()
1405 meta = self.get_container_info()
1406 blocksize = int(meta['x-container-block-size'])
1407 filesize = fstat(source_file.fileno()).st_size
1408 datasize = int(end) - int(start) + 1
1409 nblocks = 1 + (datasize - 1) // blocksize
1412 self.progress_bar_gen = upload_cb(nblocks)
1415 for i in range(nblocks):
1416 read_size = min(blocksize, filesize - offset, datasize - offset)
1417 block = source_file.read(read_size)
1418 r = self.object_post(
1421 content_type='application/octet-stream',
1422 content_length=len(block),
1423 content_range='bytes %s-%s/*' % (
1425 start + offset + len(block) - 1),
1427 headers.append(dict(r.headers))
1428 offset += len(block)
1434 self, src_container, src_object, dst_container,
1436 source_version=None,
1437 source_account=None,
1442 :param src_container: (str) source container
1444 :param src_object: (str) source object path
1446 :param dst_container: (str) destination container
1448 :param dst_object: (str) destination object path
1450 :param source_version: (str) source object version
1452 :param source_account: (str) account to copy from
1454 :param public: (bool)
1456 :param content_type: (str)
1458 :param delimiter: (str)
1460 :returns: (dict) response headers
1462 self._assert_account()
1463 self.container = dst_container
1464 src_path = path4url(src_container, src_object)
1465 r = self.object_put(
1466 dst_object or src_object,
1470 source_version=source_version,
1471 source_account=source_account,
1473 content_type=content_type,
1474 delimiter=delimiter)
1478 self, src_container, src_object, dst_container,
1480 source_account=None,
1481 source_version=None,
1486 :param src_container: (str) source container
1488 :param src_object: (str) source object path
1490 :param dst_container: (str) destination container
1492 :param dst_object: (str) destination object path
1494 :param source_account: (str) account to move from
1496 :param source_version: (str) source object version
1498 :param public: (bool)
1500 :param content_type: (str)
1502 :param delimiter: (str)
1504 :returns: (dict) response headers
1506 self._assert_account()
1507 self.container = dst_container
1508 dst_object = dst_object or src_object
1509 src_path = path4url(src_container, src_object)
1510 r = self.object_put(
1515 source_account=source_account,
1516 source_version=source_version,
1518 content_type=content_type,
1519 delimiter=delimiter)
1522 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1523 """Get accounts that share with self.account
1527 :param marker: (str)
1531 self._assert_account()
1533 self.set_param('format', 'json')
1534 self.set_param('limit', limit, iff=limit is not None)
1535 self.set_param('marker', marker, iff=marker is not None)
1538 success = kwargs.pop('success', (200, 204))
1539 r = self.get(path, *args, success=success, **kwargs)
1542 def get_object_versionlist(self, obj):
1544 :param obj: (str) remote object path
1548 self._assert_container()
1549 r = self.object_get(obj, format='json', version='list')
1550 return r.json['versions']