1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, max_value, a_range):
57 :param start: (int) the window bottom
59 :param end: (int) the window top
61 :param max_value: (int) maximum accepted value
63 :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64 where X: x|x-y|-x where x < y and x, y natural numbers
66 :returns: (str) a range string cut-off for the start-end range
67 an empty response means this window is out of range
69 assert start >= 0, '_range_up called w. start(%s) < 0' % start
70 assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
72 assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
75 return '%s-%s' % (start, end)
77 for some_range in a_range.split(','):
78 v0, sep, v1 = some_range.partition('-')
83 if v1 < start or v0 > end or v1 < v0:
85 v0 = v0 if v0 > start else start
86 v1 = v1 if v1 < end else end
87 selected.append('%s-%s' % (v0, v1))
91 v1 = v0 if v0 <= end else end
92 selected.append('%s-%s' % (start, v1))
95 if max_value - v1 > end:
97 v0 = (max_value - v1) if max_value - v1 > start else start
98 selected.append('%s-%s' % (v0, end))
99 return ','.join(selected)
102 class PithosClient(PithosRestClient):
103 """Synnefo Pithos+ API client"""
105 def __init__(self, base_url, token, account=None, container=None):
106 super(PithosClient, self).__init__(base_url, token, account, container)
108 def create_container(
110 container=None, sizelimit=None, versioning=None, metadata=None,
113 :param container: (str) if not given, self.container is used instead
115 :param sizelimit: (int) container total size limit in bytes
117 :param versioning: (str) can be auto or whatever supported by server
119 :param metadata: (dict) Custom user-defined metadata of the form
120 { 'name1': 'value1', 'name2': 'value2', ... }
122 :returns: (dict) response headers
124 cnt_back_up = self.container
126 self.container = container or cnt_back_up
127 r = self.container_put(
128 quota=sizelimit, versioning=versioning, metadata=metadata,
132 self.container = cnt_back_up
134 def purge_container(self, container=None):
135 """Delete an empty container and destroy associated blocks"""
136 cnt_back_up = self.container
138 self.container = container or cnt_back_up
139 r = self.container_delete(until=unicode(time()))
141 self.container = cnt_back_up
144 def upload_object_unchunked(
149 content_encoding=None,
150 content_disposition=None,
155 :param obj: (str) remote object path
157 :param f: open file descriptor
159 :param withHashFile: (bool)
161 :param size: (int) size of data to upload
165 :param content_encoding: (str)
167 :param content_disposition: (str)
169 :param content_type: (str)
171 :param sharing: {'read':[user and/or grp names],
172 'write':[usr and/or grp names]}
174 :param public: (bool)
176 :returns: (dict) created object metadata
178 self._assert_container()
184 data = json.dumps(json.loads(data))
186 raise ClientError('"%s" is not json-formated' % f.name, 1)
188 msg = '"%s" is not a valid hashmap file' % f.name
189 raise ClientError(msg, 1)
192 data = readall(f, size) if size else f.read()
197 content_encoding=content_encoding,
198 content_disposition=content_disposition,
199 content_type=content_type,
205 def create_object_by_manifestation(
208 content_encoding=None,
209 content_disposition=None,
214 :param obj: (str) remote object path
218 :param content_encoding: (str)
220 :param content_disposition: (str)
222 :param content_type: (str)
224 :param sharing: {'read':[user and/or grp names],
225 'write':[usr and/or grp names]}
227 :param public: (bool)
229 :returns: (dict) created object metadata
231 self._assert_container()
236 content_encoding=content_encoding,
237 content_disposition=content_disposition,
238 content_type=content_type,
241 manifest='%s/%s' % (self.container, obj))
244 # upload_* auxiliary methods
245 def _put_block_async(self, data, hash):
246 event = SilentEvent(method=self._put_block, data=data, hash=hash)
250 def _put_block(self, data, hash):
251 r = self.container_post(
253 content_type='application/octet-stream',
254 content_length=len(data),
257 assert r.json[0] == hash, 'Local hash does not match server'
259 def _get_file_block_info(self, fileobj, size=None, cache=None):
261 :param fileobj: (file descriptor) source
263 :param size: (int) size of data to upload from source
265 :param cache: (dict) if provided, cache container info response to
266 avoid redundant calls
268 if isinstance(cache, dict):
270 meta = cache[self.container]
272 meta = self.get_container_info()
273 cache[self.container] = meta
275 meta = self.get_container_info()
276 blocksize = int(meta['x-container-block-size'])
277 blockhash = meta['x-container-block-hash']
278 size = size if size is not None else fstat(fileobj.fileno()).st_size
279 nblocks = 1 + (size - 1) // blocksize
280 return (blocksize, blockhash, size, nblocks)
282 def _create_object_or_get_missing_hashes(
289 if_etag_not_match=None,
290 content_encoding=None,
291 content_disposition=None,
299 content_type=content_type,
301 if_etag_match=if_etag_match,
302 if_etag_not_match=if_etag_not_match,
303 content_encoding=content_encoding,
304 content_disposition=content_disposition,
305 permissions=permissions,
308 return (None if r.status_code == 201 else r.json), r.headers
310 def _calculate_blocks_for_upload(
311 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
315 hash_gen = hash_cb(nblocks)
318 for i in range(nblocks):
319 block = readall(fileobj, min(blocksize, size - offset))
321 hash = _pithos_hash(block, blockhash)
323 hmap[hash] = (offset, bytes)
327 msg = ('Failed to calculate uploaded blocks:'
328 ' Offset and object size do not match')
329 assert offset == size, msg
331 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
332 """upload missing blocks asynchronously"""
334 self._init_thread_limit()
339 offset, bytes = hmap[hash]
341 data = readall(fileobj, bytes)
342 r = self._put_block_async(data, hash)
344 unfinished = self._watch_thread_limit(flying)
345 for thread in set(flying).difference(unfinished):
347 failures.append(thread)
350 ClientError) and thread.exception.status == 502:
351 self.POOLSIZE = self._thread_limit
352 elif thread.isAlive():
353 flying.append(thread)
361 for thread in flying:
364 failures.append(thread)
371 return [failure.kwargs['hash'] for failure in failures]
381 content_encoding=None,
382 content_disposition=None,
386 container_info_cache=None):
387 """Upload an object using multiple connections (threads)
389 :param obj: (str) remote object path
391 :param f: open file descriptor (rb)
393 :param hash_cb: optional progress.bar object for calculating hashes
395 :param upload_cb: optional progress.bar object for uploading
399 :param if_etag_match: (str) Push that value to if-match header at file
402 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
403 it does not exist remotely, otherwise the operation will fail.
404 Involves the case of an object with the same path is created while
405 the object is being uploaded.
407 :param content_encoding: (str)
409 :param content_disposition: (str)
411 :param content_type: (str)
413 :param sharing: {'read':[user and/or grp names],
414 'write':[usr and/or grp names]}
416 :param public: (bool)
418 :param container_info_cache: (dict) if given, avoid redundant calls to
419 server for container info (block size and hash information)
421 self._assert_container()
424 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
425 f, size, container_info_cache)
426 (hashes, hmap, offset) = ([], {}, 0)
428 content_type = 'application/octet-stream'
430 self._calculate_blocks_for_upload(
437 hashmap = dict(bytes=size, hashes=hashes)
438 missing, obj_headers = self._create_object_or_get_missing_hashes(
440 content_type=content_type,
442 if_etag_match=if_etag_match,
443 if_etag_not_match='*' if if_not_exist else None,
444 content_encoding=content_encoding,
445 content_disposition=content_disposition,
453 upload_gen = upload_cb(len(missing))
454 for i in range(len(missing), len(hashmap['hashes']) + 1):
465 sendlog.info('%s blocks missing' % len(missing))
466 num_of_blocks = len(missing)
467 missing = self._upload_missing_blocks(
473 if num_of_blocks == len(missing):
476 num_of_blocks = len(missing)
481 details = ['%s' % thread.exception for thread in missing]
483 details = ['Also, failed to read thread exceptions']
485 '%s blocks failed to upload' % len(missing),
487 except KeyboardInterrupt:
488 sendlog.info('- - - wait for threads to finish')
489 for thread in activethreads():
497 content_type=content_type,
498 content_encoding=content_encoding,
499 if_etag_match=if_etag_match,
500 if_etag_not_match='*' if if_not_exist else None,
508 def upload_from_string(
509 self, obj, input_str,
515 content_encoding=None,
516 content_disposition=None,
520 container_info_cache=None):
521 """Upload an object using multiple connections (threads)
523 :param obj: (str) remote object path
525 :param input_str: (str) upload content
527 :param hash_cb: optional progress.bar object for calculating hashes
529 :param upload_cb: optional progress.bar object for uploading
533 :param if_etag_match: (str) Push that value to if-match header at file
536 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
537 it does not exist remotely, otherwise the operation will fail.
538 Involves the case of an object with the same path is created while
539 the object is being uploaded.
541 :param content_encoding: (str)
543 :param content_disposition: (str)
545 :param content_type: (str)
547 :param sharing: {'read':[user and/or grp names],
548 'write':[usr and/or grp names]}
550 :param public: (bool)
552 :param container_info_cache: (dict) if given, avoid redundant calls to
553 server for container info (block size and hash information)
555 self._assert_container()
557 blocksize, blockhash, size, nblocks = self._get_file_block_info(
558 fileobj=None, size=len(input_str), cache=container_info_cache)
559 (hashes, hmap, offset) = ([], {}, 0)
561 content_type = 'application/octet-stream'
565 for blockid in range(nblocks):
566 start = blockid * blocksize
567 block = input_str[start: (start + blocksize)]
568 hashes.append(_pithos_hash(block, blockhash))
569 hmap[hashes[blockid]] = (start, block)
571 hashmap = dict(bytes=size, hashes=hashes)
572 missing, obj_headers = self._create_object_or_get_missing_hashes(
574 content_type=content_type,
576 if_etag_match=if_etag_match,
577 if_etag_not_match='*' if if_not_exist else None,
578 content_encoding=content_encoding,
579 content_disposition=content_disposition,
584 num_of_missing = len(missing)
587 self.progress_bar_gen = upload_cb(nblocks)
588 for i in range(nblocks + 1 - num_of_missing):
594 while tries and missing:
598 offset, block = hmap[hash]
599 bird = self._put_block_async(block, hash)
601 unfinished = self._watch_thread_limit(flying)
602 for thread in set(flying).difference(unfinished):
604 failures.append(thread.kwargs['hash'])
606 flying.append(thread)
610 for thread in flying:
613 failures.append(thread.kwargs['hash'])
616 if missing and len(missing) == old_failures:
618 old_failures = len(missing)
621 '%s blocks failed to upload' % len(missing),
622 details=['%s' % thread.exception for thread in missing])
623 except KeyboardInterrupt:
624 sendlog.info('- - - wait for threads to finish')
625 for thread in activethreads():
633 content_type=content_type,
634 content_encoding=content_encoding,
635 if_etag_match=if_etag_match,
636 if_etag_not_match='*' if if_not_exist else None,
644 # download_* auxiliary methods
645 def _get_remote_blocks_info(self, obj, **restargs):
646 #retrieve object hashmap
647 myrange = restargs.pop('data_range', None)
648 hashmap = self.get_object_hashmap(obj, **restargs)
649 restargs['data_range'] = myrange
650 blocksize = int(hashmap['block_size'])
651 blockhash = hashmap['block_hash']
652 total_size = hashmap['bytes']
653 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
655 for i, h in enumerate(hashmap['hashes']):
656 # map_dict[h] = i CHAGE
658 map_dict[h].append(i)
661 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
663 def _dump_blocks_sync(
664 self, obj, remote_hashes, blocksize, total_size, dst, crange,
668 for blockid, blockhash in enumerate(remote_hashes):
670 start = blocksize * blockid
671 is_last = start + blocksize > total_size
672 end = (total_size - 1) if is_last else (start + blocksize - 1)
673 data_range = _range_up(start, end, total_size, crange)
677 args['data_range'] = 'bytes=%s' % data_range
678 r = self.object_get(obj, success=(200, 206), **args)
683 def _get_block_async(self, obj, **args):
684 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
688 def _hash_from_file(self, fp, start, size, blockhash):
690 block = readall(fp, size)
691 h = newhashlib(blockhash)
692 h.update(block.strip('\x00'))
693 return hexlify(h.digest())
695 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
696 """write the results of a greenleted rest call to a file
698 :param offset: the offset of the file up to blocksize
699 - e.g. if the range is 10-100, all blocks will be written to
702 for key, g in flying.items():
707 block = g.value.content
708 for block_start in blockids[key]:
709 local_file.seek(block_start + offset)
710 local_file.write(block)
716 def _dump_blocks_async(
717 self, obj, remote_hashes, blocksize, total_size, local_file,
718 blockhash=None, resume=False, filerange=None, **restargs):
719 file_size = fstat(local_file.fileno()).st_size if resume else 0
721 blockid_dict = dict()
724 self._init_thread_limit()
725 for block_hash, blockids in remote_hashes.items():
726 blockids = [blk * blocksize for blk in blockids]
727 unsaved = [blk for blk in blockids if not (
728 blk < file_size and block_hash == self._hash_from_file(
729 local_file, blk, blocksize, blockhash))]
730 self._cb_next(len(blockids) - len(unsaved))
733 self._watch_thread_limit(flying.values())
735 flying, blockid_dict, local_file, offset,
737 end = total_size - 1 if (
738 key + blocksize > total_size) else key + blocksize - 1
742 data_range = _range_up(key, end, total_size, filerange)
747 'async_headers'] = {'Range': 'bytes=%s' % data_range}
748 flying[key] = self._get_block_async(obj, **restargs)
749 blockid_dict[key] = unsaved
751 for thread in flying.values():
753 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
763 if_modified_since=None,
764 if_unmodified_since=None):
765 """Download an object (multiple connections, random blocks)
767 :param obj: (str) remote object path
769 :param dst: open file descriptor (wb+)
771 :param download_cb: optional progress.bar object for downloading
773 :param version: (str) file version
775 :param resume: (bool) if set, preserve already downloaded file parts
777 :param range_str: (str) from, to are file positions (int) in bytes
779 :param if_match: (str)
781 :param if_none_match: (str)
783 :param if_modified_since: (str) formated date
785 :param if_unmodified_since: (str) formated date"""
788 data_range=None if range_str is None else 'bytes=%s' % range_str,
790 if_none_match=if_none_match,
791 if_modified_since=if_modified_since,
792 if_unmodified_since=if_unmodified_since)
799 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
800 assert total_size >= 0
803 self.progress_bar_gen = download_cb(len(hash_list))
807 self._dump_blocks_sync(
816 self._dump_blocks_async(
827 dst.truncate(total_size)
831 def download_to_string(
838 if_modified_since=None,
839 if_unmodified_since=None):
840 """Download an object to a string (multiple connections). This method
841 uses threads for http requests, but stores all content in memory.
843 :param obj: (str) remote object path
845 :param download_cb: optional progress.bar object for downloading
847 :param version: (str) file version
849 :param range_str: (str) from, to are file positions (int) in bytes
851 :param if_match: (str)
853 :param if_none_match: (str)
855 :param if_modified_since: (str) formated date
857 :param if_unmodified_since: (str) formated date
859 :returns: (str) the whole object contents
863 data_range=None if range_str is None else 'bytes=%s' % range_str,
865 if_none_match=if_none_match,
866 if_modified_since=if_modified_since,
867 if_unmodified_since=if_unmodified_since)
874 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
875 assert total_size >= 0
878 self.progress_bar_gen = download_cb(len(hash_list))
881 num_of_blocks = len(remote_hashes)
882 ret = [''] * num_of_blocks
883 self._init_thread_limit()
886 for blockid, blockhash in enumerate(remote_hashes):
887 start = blocksize * blockid
888 is_last = start + blocksize > total_size
889 end = (total_size - 1) if is_last else (start + blocksize - 1)
890 data_range_str = _range_up(start, end, end, range_str)
892 self._watch_thread_limit(flying.values())
893 restargs['data_range'] = 'bytes=%s' % data_range_str
894 flying[blockid] = self._get_block_async(obj, **restargs)
895 for runid, thread in flying.items():
896 if (blockid + 1) == num_of_blocks:
898 elif thread.isAlive():
901 raise thread.exception
902 ret[runid] = thread.value.content
906 except KeyboardInterrupt:
907 sendlog.info('- - - wait for threads to finish')
908 for thread in activethreads():
911 #Command Progress Bar method
912 def _cb_next(self, step=1):
913 if hasattr(self, 'progress_bar_gen'):
915 for i in xrange(step):
916 self.progress_bar_gen.next()
920 def _complete_cb(self):
923 self.progress_bar_gen.next()
927 def get_object_hashmap(
932 if_modified_since=None,
933 if_unmodified_since=None):
935 :param obj: (str) remote object path
937 :param if_match: (str)
939 :param if_none_match: (str)
941 :param if_modified_since: (str) formated date
943 :param if_unmodified_since: (str) formated date
952 if_etag_match=if_match,
953 if_etag_not_match=if_none_match,
954 if_modified_since=if_modified_since,
955 if_unmodified_since=if_unmodified_since)
956 except ClientError as err:
957 if err.status == 304 or err.status == 412:
962 def set_account_group(self, group, usernames):
966 :param usernames: (list)
968 r = self.account_post(update=True, groups={group: usernames})
971 def del_account_group(self, group):
975 self.account_post(update=True, groups={group: []})
977 def get_account_info(self, until=None):
979 :param until: (str) formated date
983 r = self.account_head(until=until)
984 if r.status_code == 401:
985 raise ClientError("No authorization", status=401)
988 def get_account_quota(self):
993 self.get_account_info(),
994 'X-Account-Policy-Quota',
997 #def get_account_versioning(self):
1002 # self.get_account_info(),
1003 # 'X-Account-Policy-Versioning',
1006 def get_account_meta(self, until=None):
1008 :param until: (str) formated date
1012 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1014 def get_account_group(self):
1018 return filter_in(self.get_account_info(), 'X-Account-Group-')
1020 def set_account_meta(self, metapairs):
1022 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1024 assert(type(metapairs) is dict)
1025 r = self.account_post(update=True, metadata=metapairs)
1028 def del_account_meta(self, metakey):
1030 :param metakey: (str) metadatum key
1032 r = self.account_post(update=True, metadata={metakey: ''})
1035 #def set_account_quota(self, quota):
1037 # :param quota: (int)
1039 # self.account_post(update=True, quota=quota)
1041 #def set_account_versioning(self, versioning):
1043 # :param versioning: (str)
1045 # r = self.account_post(update=True, versioning=versioning)
1048 def list_containers(self):
1052 r = self.account_get()
1055 def del_container(self, until=None, delimiter=None):
1057 :param until: (str) formated date
1059 :param delimiter: (str) with / empty container
1061 :raises ClientError: 404 Container does not exist
1063 :raises ClientError: 409 Container is not empty
1065 self._assert_container()
1066 r = self.container_delete(
1068 delimiter=delimiter,
1069 success=(204, 404, 409))
1070 if r.status_code == 404:
1072 'Container "%s" does not exist' % self.container,
1074 elif r.status_code == 409:
1076 'Container "%s" is not empty' % self.container,
1080 def get_container_versioning(self, container=None):
1082 :param container: (str)
1086 cnt_back_up = self.container
1088 self.container = container or cnt_back_up
1090 self.get_container_info(),
1091 'X-Container-Policy-Versioning')
1093 self.container = cnt_back_up
1095 def get_container_limit(self, container=None):
1097 :param container: (str)
1101 cnt_back_up = self.container
1103 self.container = container or cnt_back_up
1105 self.get_container_info(),
1106 'X-Container-Policy-Quota')
1108 self.container = cnt_back_up
1110 def get_container_info(self, until=None):
1112 :param until: (str) formated date
1116 :raises ClientError: 404 Container not found
1119 r = self.container_head(until=until)
1120 except ClientError as err:
1121 err.details.append('for container %s' % self.container)
1125 def get_container_meta(self, until=None):
1127 :param until: (str) formated date
1132 self.get_container_info(until=until),
1135 def get_container_object_meta(self, until=None):
1137 :param until: (str) formated date
1142 self.get_container_info(until=until),
1143 'X-Container-Object-Meta')
1145 def set_container_meta(self, metapairs):
1147 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1149 assert(type(metapairs) is dict)
1150 r = self.container_post(update=True, metadata=metapairs)
1153 def del_container_meta(self, metakey):
1155 :param metakey: (str) metadatum key
1157 :returns: (dict) response headers
1159 r = self.container_post(update=True, metadata={metakey: ''})
1162 def set_container_limit(self, limit):
1166 r = self.container_post(update=True, quota=limit)
1169 def set_container_versioning(self, versioning):
1171 :param versioning: (str)
1173 r = self.container_post(update=True, versioning=versioning)
1176 def del_object(self, obj, until=None, delimiter=None):
1178 :param obj: (str) remote object path
1180 :param until: (str) formated date
1182 :param delimiter: (str)
1184 self._assert_container()
1185 r = self.object_delete(obj, until=until, delimiter=delimiter)
1188 def set_object_meta(self, obj, metapairs):
1190 :param obj: (str) remote object path
1192 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1194 assert(type(metapairs) is dict)
1195 r = self.object_post(obj, update=True, metadata=metapairs)
1198 def del_object_meta(self, obj, metakey):
1200 :param obj: (str) remote object path
1202 :param metakey: (str) metadatum key
1204 r = self.object_post(obj, update=True, metadata={metakey: ''})
1207 def publish_object(self, obj):
1209 :param obj: (str) remote object path
1211 :returns: (str) access url
1213 self.object_post(obj, update=True, public=True)
1214 info = self.get_object_info(obj)
1215 return info['x-object-public']
1216 pref, sep, rest = self.base_url.partition('//')
1217 base = rest.split('/')[0]
1218 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1220 def unpublish_object(self, obj):
1222 :param obj: (str) remote object path
1224 r = self.object_post(obj, update=True, public=False)
1227 def get_object_info(self, obj, version=None):
1229 :param obj: (str) remote object path
1231 :param version: (str)
1236 r = self.object_head(obj, version=version)
1238 except ClientError as ce:
1239 if ce.status == 404:
1240 raise ClientError('Object %s not found' % obj, status=404)
1243 def get_object_meta(self, obj, version=None):
1245 :param obj: (str) remote object path
1247 :param version: (str)
1252 self.get_object_info(obj, version=version),
1255 def get_object_sharing(self, obj):
1257 :param obj: (str) remote object path
1262 self.get_object_info(obj),
1267 perms = r['x-object-sharing'].split(';')
1272 raise ClientError('Incorrect reply format')
1273 (key, val) = perm.strip().split('=')
1277 def set_object_sharing(
1279 read_permission=False, write_permission=False):
1280 """Give read/write permisions to an object.
1282 :param obj: (str) remote object path
1284 :param read_permission: (list - bool) users and user groups that get
1285 read permission for this object - False means all previous read
1286 permissions will be removed
1288 :param write_permission: (list - bool) of users and user groups to get
1289 write permission for this object - False means all previous write
1290 permissions will be removed
1292 :returns: (dict) response headers
1295 perms = dict(read=read_permission or '', write=write_permission or '')
1296 r = self.object_post(obj, update=True, permissions=perms)
1299 def del_object_sharing(self, obj):
1301 :param obj: (str) remote object path
1303 return self.set_object_sharing(obj)
1305 def append_object(self, obj, source_file, upload_cb=None):
1307 :param obj: (str) remote object path
1309 :param source_file: open file descriptor
1311 :param upload_db: progress.bar for uploading
1313 self._assert_container()
1314 meta = self.get_container_info()
1315 blocksize = int(meta['x-container-block-size'])
1316 filesize = fstat(source_file.fileno()).st_size
1317 nblocks = 1 + (filesize - 1) // blocksize
1321 self.progress_bar_gen = upload_cb(nblocks)
1324 self._init_thread_limit()
1326 for i in range(nblocks):
1327 block = source_file.read(min(blocksize, filesize - offset))
1328 offset += len(block)
1330 self._watch_thread_limit(flying.values())
1332 flying[i] = SilentEvent(
1333 method=self.object_post,
1336 content_range='bytes */*',
1337 content_type='application/octet-stream',
1338 content_length=len(block),
1342 for key, thread in flying.items():
1343 if thread.isAlive():
1345 unfinished[key] = thread
1348 if thread.exception:
1349 raise thread.exception
1350 headers[key] = thread.value.headers
1353 except KeyboardInterrupt:
1354 sendlog.info('- - - wait for threads to finish')
1355 for thread in activethreads():
1358 from time import sleep
1359 sleep(2 * len(activethreads()))
1360 return headers.values()
1362 def truncate_object(self, obj, upto_bytes):
1364 :param obj: (str) remote object path
1366 :param upto_bytes: max number of bytes to leave on file
1368 :returns: (dict) response headers
1370 r = self.object_post(
1373 content_range='bytes 0-%s/*' % upto_bytes,
1374 content_type='application/octet-stream',
1375 object_bytes=upto_bytes,
1376 source_object=path4url(self.container, obj))
1379 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1380 """Overwrite a part of an object from local source file
1382 :param obj: (str) remote object path
1384 :param start: (int) position in bytes to start overwriting from
1386 :param end: (int) position in bytes to stop overwriting at
1388 :param source_file: open file descriptor
1390 :param upload_db: progress.bar for uploading
1393 r = self.get_object_info(obj)
1394 rf_size = int(r['content-length'])
1395 if rf_size < int(start):
1397 'Range start exceeds file size',
1399 elif rf_size < int(end):
1401 'Range end exceeds file size',
1403 self._assert_container()
1404 meta = self.get_container_info()
1405 blocksize = int(meta['x-container-block-size'])
1406 filesize = fstat(source_file.fileno()).st_size
1407 datasize = int(end) - int(start) + 1
1408 nblocks = 1 + (datasize - 1) // blocksize
1411 self.progress_bar_gen = upload_cb(nblocks)
1414 for i in range(nblocks):
1415 read_size = min(blocksize, filesize - offset, datasize - offset)
1416 block = source_file.read(read_size)
1417 r = self.object_post(
1420 content_type='application/octet-stream',
1421 content_length=len(block),
1422 content_range='bytes %s-%s/*' % (
1424 start + offset + len(block) - 1),
1426 headers.append(dict(r.headers))
1427 offset += len(block)
1433 self, src_container, src_object, dst_container,
1435 source_version=None,
1436 source_account=None,
1441 :param src_container: (str) source container
1443 :param src_object: (str) source object path
1445 :param dst_container: (str) destination container
1447 :param dst_object: (str) destination object path
1449 :param source_version: (str) source object version
1451 :param source_account: (str) account to copy from
1453 :param public: (bool)
1455 :param content_type: (str)
1457 :param delimiter: (str)
1459 :returns: (dict) response headers
1461 self._assert_account()
1462 self.container = dst_container
1463 src_path = path4url(src_container, src_object)
1464 r = self.object_put(
1465 dst_object or src_object,
1469 source_version=source_version,
1470 source_account=source_account,
1472 content_type=content_type,
1473 delimiter=delimiter)
1477 self, src_container, src_object, dst_container,
1479 source_account=None,
1480 source_version=None,
1485 :param src_container: (str) source container
1487 :param src_object: (str) source object path
1489 :param dst_container: (str) destination container
1491 :param dst_object: (str) destination object path
1493 :param source_account: (str) account to move from
1495 :param source_version: (str) source object version
1497 :param public: (bool)
1499 :param content_type: (str)
1501 :param delimiter: (str)
1503 :returns: (dict) response headers
1505 self._assert_account()
1506 self.container = dst_container
1507 dst_object = dst_object or src_object
1508 src_path = path4url(src_container, src_object)
1509 r = self.object_put(
1514 source_account=source_account,
1515 source_version=source_version,
1517 content_type=content_type,
1518 delimiter=delimiter)
1521 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1522 """Get accounts that share with self.account
1526 :param marker: (str)
1530 self._assert_account()
1532 self.set_param('format', 'json')
1533 self.set_param('limit', limit, iff=limit is not None)
1534 self.set_param('marker', marker, iff=marker is not None)
1537 success = kwargs.pop('success', (200, 204))
1538 r = self.get(path, *args, success=success, **kwargs)
1541 def get_object_versionlist(self, obj):
1543 :param obj: (str) remote object path
1547 self._assert_container()
1548 r = self.object_get(obj, format='json', version='list')
1549 return r.json['versions']