1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 :returns: (dict) created object metadata
118 self._assert_container()
124 data = json.dumps(json.loads(data))
126 raise ClientError('"%s" is not json-formated' % f.name, 1)
128 msg = '"%s" is not a valid hashmap file' % f.name
129 raise ClientError(msg, 1)
132 data = f.read(size) if size else f.read()
137 content_encoding=content_encoding,
138 content_disposition=content_disposition,
139 content_type=content_type,
145 def create_object_by_manifestation(
148 content_encoding=None,
149 content_disposition=None,
154 :param obj: (str) remote object path
158 :param content_encoding: (str)
160 :param content_disposition: (str)
162 :param content_type: (str)
164 :param sharing: {'read':[user and/or grp names],
165 'write':[usr and/or grp names]}
167 :param public: (bool)
169 :returns: (dict) created object metadata
171 self._assert_container()
176 content_encoding=content_encoding,
177 content_disposition=content_disposition,
178 content_type=content_type,
181 manifest='%s/%s' % (self.container, obj))
184 # upload_* auxiliary methods
185 def _put_block_async(self, data, hash):
186 event = SilentEvent(method=self._put_block, data=data, hash=hash)
190 def _put_block(self, data, hash):
191 r = self.container_post(
193 content_type='application/octet-stream',
194 content_length=len(data),
197 assert r.json[0] == hash, 'Local hash does not match server'
199 def _get_file_block_info(self, fileobj, size=None, cache=None):
201 :param fileobj: (file descriptor) source
203 :param size: (int) size of data to upload from source
205 :param cache: (dict) if provided, cache container info response to
206 avoid redundant calls
208 if isinstance(cache, dict):
210 meta = cache[self.container]
212 meta = self.get_container_info()
213 cache[self.container] = meta
215 meta = self.get_container_info()
216 blocksize = int(meta['x-container-block-size'])
217 blockhash = meta['x-container-block-hash']
218 size = size if size is not None else fstat(fileobj.fileno()).st_size
219 nblocks = 1 + (size - 1) // blocksize
220 return (blocksize, blockhash, size, nblocks)
222 def _create_object_or_get_missing_hashes(
229 if_etag_not_match=None,
230 content_encoding=None,
231 content_disposition=None,
239 content_type=content_type,
241 if_etag_match=if_etag_match,
242 if_etag_not_match=if_etag_not_match,
243 content_encoding=content_encoding,
244 content_disposition=content_disposition,
245 permissions=permissions,
248 return (None if r.status_code == 201 else r.json), r.headers
250 def _calculate_blocks_for_upload(
251 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
255 hash_gen = hash_cb(nblocks)
258 for i in range(nblocks):
259 block = fileobj.read(min(blocksize, size - offset))
261 hash = _pithos_hash(block, blockhash)
263 hmap[hash] = (offset, bytes)
267 msg = 'Failed to calculate uploaded blocks:'
268 ' Offset and object size do not match'
269 assert offset == size, msg
271 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272 """upload missing blocks asynchronously"""
274 self._init_thread_limit()
279 offset, bytes = hmap[hash]
281 data = fileobj.read(bytes)
282 r = self._put_block_async(data, hash)
284 unfinished = self._watch_thread_limit(flying)
285 for thread in set(flying).difference(unfinished):
287 failures.append(thread)
290 ClientError) and thread.exception.status == 502:
291 self.POOLSIZE = self._thread_limit
292 elif thread.isAlive():
293 flying.append(thread)
301 for thread in flying:
304 failures.append(thread)
311 return [failure.kwargs['hash'] for failure in failures]
321 content_encoding=None,
322 content_disposition=None,
326 container_info_cache=None):
327 """Upload an object using multiple connections (threads)
329 :param obj: (str) remote object path
331 :param f: open file descriptor (rb)
333 :param hash_cb: optional progress.bar object for calculating hashes
335 :param upload_cb: optional progress.bar object for uploading
339 :param if_etag_match: (str) Push that value to if-match header at file
342 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343 it does not exist remotely, otherwise the operation will fail.
344 Involves the case of an object with the same path is created while
345 the object is being uploaded.
347 :param content_encoding: (str)
349 :param content_disposition: (str)
351 :param content_type: (str)
353 :param sharing: {'read':[user and/or grp names],
354 'write':[usr and/or grp names]}
356 :param public: (bool)
358 :param container_info_cache: (dict) if given, avoid redundant calls to
359 server for container info (block size and hash information)
361 self._assert_container()
364 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
365 f, size, container_info_cache)
366 (hashes, hmap, offset) = ([], {}, 0)
368 content_type = 'application/octet-stream'
370 self._calculate_blocks_for_upload(
377 hashmap = dict(bytes=size, hashes=hashes)
378 missing, obj_headers = self._create_object_or_get_missing_hashes(
380 content_type=content_type,
382 if_etag_match=if_etag_match,
383 if_etag_not_match='*' if if_not_exist else None,
384 content_encoding=content_encoding,
385 content_disposition=content_disposition,
393 upload_gen = upload_cb(len(missing))
394 for i in range(len(missing), len(hashmap['hashes']) + 1):
405 sendlog.info('%s blocks missing' % len(missing))
406 num_of_blocks = len(missing)
407 missing = self._upload_missing_blocks(
413 if num_of_blocks == len(missing):
416 num_of_blocks = len(missing)
421 '%s blocks failed to upload' % len(missing),
422 details=['%s' % thread.exception for thread in missing])
423 except KeyboardInterrupt:
424 sendlog.info('- - - wait for threads to finish')
425 for thread in activethreads():
433 content_type=content_type,
434 if_etag_match=if_etag_match,
435 if_etag_not_match='*' if if_not_exist else None,
443 def upload_from_string(
444 self, obj, input_str,
450 content_encoding=None,
451 content_disposition=None,
455 container_info_cache=None):
456 """Upload an object using multiple connections (threads)
458 :param obj: (str) remote object path
460 :param input_str: (str) upload content
462 :param hash_cb: optional progress.bar object for calculating hashes
464 :param upload_cb: optional progress.bar object for uploading
468 :param if_etag_match: (str) Push that value to if-match header at file
471 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
472 it does not exist remotely, otherwise the operation will fail.
473 Involves the case of an object with the same path is created while
474 the object is being uploaded.
476 :param content_encoding: (str)
478 :param content_disposition: (str)
480 :param content_type: (str)
482 :param sharing: {'read':[user and/or grp names],
483 'write':[usr and/or grp names]}
485 :param public: (bool)
487 :param container_info_cache: (dict) if given, avoid redundant calls to
488 server for container info (block size and hash information)
490 self._assert_container()
492 blocksize, blockhash, size, nblocks = self._get_file_block_info(
493 fileobj=None, size=len(input_str), cache=container_info_cache)
494 (hashes, hmap, offset) = ([], {}, 0)
496 content_type = 'application/octet-stream'
500 for blockid in range(nblocks):
501 start = blockid * blocksize
502 block = input_str[start: (start + blocksize)]
503 hashes.append(_pithos_hash(block, blockhash))
504 hmap[hashes[blockid]] = (start, block)
506 hashmap = dict(bytes=size, hashes=hashes)
507 missing, obj_headers = self._create_object_or_get_missing_hashes(
509 content_type=content_type,
511 if_etag_match=if_etag_match,
512 if_etag_not_match='*' if if_not_exist else None,
513 content_encoding=content_encoding,
514 content_disposition=content_disposition,
519 num_of_missing = len(missing)
522 self.progress_bar_gen = upload_cb(nblocks)
523 for i in range(nblocks + 1 - num_of_missing):
529 while tries and missing:
533 offset, block = hmap[hash]
534 bird = self._put_block_async(block, hash)
536 unfinished = self._watch_thread_limit(flying)
537 for thread in set(flying).difference(unfinished):
539 failures.append(thread.kwargs['hash'])
541 flying.append(thread)
545 for thread in flying:
548 failures.append(thread.kwargs['hash'])
551 if missing and len(missing) == old_failures:
553 old_failures = len(missing)
556 '%s blocks failed to upload' % len(missing),
557 details=['%s' % thread.exception for thread in missing])
558 except KeyboardInterrupt:
559 sendlog.info('- - - wait for threads to finish')
560 for thread in activethreads():
568 content_type=content_type,
569 if_etag_match=if_etag_match,
570 if_etag_not_match='*' if if_not_exist else None,
578 # download_* auxiliary methods
579 def _get_remote_blocks_info(self, obj, **restargs):
580 #retrieve object hashmap
581 myrange = restargs.pop('data_range', None)
582 hashmap = self.get_object_hashmap(obj, **restargs)
583 restargs['data_range'] = myrange
584 blocksize = int(hashmap['block_size'])
585 blockhash = hashmap['block_hash']
586 total_size = hashmap['bytes']
587 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
589 for i, h in enumerate(hashmap['hashes']):
590 # map_dict[h] = i CHAGE
592 map_dict[h].append(i)
595 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
597 def _dump_blocks_sync(
598 self, obj, remote_hashes, blocksize, total_size, dst, range,
600 for blockid, blockhash in enumerate(remote_hashes):
602 start = blocksize * blockid
603 is_last = start + blocksize > total_size
604 end = (total_size - 1) if is_last else (start + blocksize - 1)
605 (start, end) = _range_up(start, end, range)
606 args['data_range'] = 'bytes=%s-%s' % (start, end)
607 r = self.object_get(obj, success=(200, 206), **args)
612 def _get_block_async(self, obj, **args):
613 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
617 def _hash_from_file(self, fp, start, size, blockhash):
619 block = fp.read(size)
620 h = newhashlib(blockhash)
621 h.update(block.strip('\x00'))
622 return hexlify(h.digest())
624 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
625 """write the results of a greenleted rest call to a file
627 :param offset: the offset of the file up to blocksize
628 - e.g. if the range is 10-100, all blocks will be written to
631 for key, g in flying.items():
636 block = g.value.content
637 for block_start in blockids[key]:
638 local_file.seek(block_start + offset)
639 local_file.write(block)
645 def _dump_blocks_async(
646 self, obj, remote_hashes, blocksize, total_size, local_file,
647 blockhash=None, resume=False, filerange=None, **restargs):
648 file_size = fstat(local_file.fileno()).st_size if resume else 0
650 blockid_dict = dict()
652 if filerange is not None:
653 rstart = int(filerange.split('-')[0])
654 offset = rstart if blocksize > rstart else rstart % blocksize
656 self._init_thread_limit()
657 for block_hash, blockids in remote_hashes.items():
658 blockids = [blk * blocksize for blk in blockids]
659 unsaved = [blk for blk in blockids if not (
660 blk < file_size and block_hash == self._hash_from_file(
661 local_file, blk, blocksize, blockhash))]
662 self._cb_next(len(blockids) - len(unsaved))
665 self._watch_thread_limit(flying.values())
667 flying, blockid_dict, local_file, offset,
669 end = total_size - 1 if (
670 key + blocksize > total_size) else key + blocksize - 1
671 start, end = _range_up(key, end, filerange)
675 restargs['async_headers'] = {
676 'Range': 'bytes=%s-%s' % (start, end)}
677 flying[key] = self._get_block_async(obj, **restargs)
678 blockid_dict[key] = unsaved
680 for thread in flying.values():
682 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
692 if_modified_since=None,
693 if_unmodified_since=None):
694 """Download an object (multiple connections, random blocks)
696 :param obj: (str) remote object path
698 :param dst: open file descriptor (wb+)
700 :param download_cb: optional progress.bar object for downloading
702 :param version: (str) file version
704 :param resume: (bool) if set, preserve already downloaded file parts
706 :param range_str: (str) from, to are file positions (int) in bytes
708 :param if_match: (str)
710 :param if_none_match: (str)
712 :param if_modified_since: (str) formated date
714 :param if_unmodified_since: (str) formated date"""
717 data_range=None if range_str is None else 'bytes=%s' % range_str,
719 if_none_match=if_none_match,
720 if_modified_since=if_modified_since,
721 if_unmodified_since=if_unmodified_since)
728 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
729 assert total_size >= 0
732 self.progress_bar_gen = download_cb(len(hash_list))
736 self._dump_blocks_sync(
745 self._dump_blocks_async(
756 dst.truncate(total_size)
760 def download_to_string(
767 if_modified_since=None,
768 if_unmodified_since=None):
769 """Download an object to a string (multiple connections). This method
770 uses threads for http requests, but stores all content in memory.
772 :param obj: (str) remote object path
774 :param download_cb: optional progress.bar object for downloading
776 :param version: (str) file version
778 :param range_str: (str) from, to are file positions (int) in bytes
780 :param if_match: (str)
782 :param if_none_match: (str)
784 :param if_modified_since: (str) formated date
786 :param if_unmodified_since: (str) formated date
788 :returns: (str) the whole object contents
792 data_range=None if range_str is None else 'bytes=%s' % range_str,
794 if_none_match=if_none_match,
795 if_modified_since=if_modified_since,
796 if_unmodified_since=if_unmodified_since)
803 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
804 assert total_size >= 0
807 self.progress_bar_gen = download_cb(len(hash_list))
810 num_of_blocks = len(remote_hashes)
811 ret = [''] * num_of_blocks
812 self._init_thread_limit()
814 for blockid, blockhash in enumerate(remote_hashes):
815 start = blocksize * blockid
816 is_last = start + blocksize > total_size
817 end = (total_size - 1) if is_last else (start + blocksize - 1)
818 (start, end) = _range_up(start, end, range_str)
820 self._watch_thread_limit(flying.values())
821 flying[blockid] = self._get_block_async(obj, **restargs)
822 for runid, thread in flying.items():
823 if (blockid + 1) == num_of_blocks:
825 elif thread.isAlive():
828 raise thread.exception
829 ret[runid] = thread.value.content
834 #Command Progress Bar method
835 def _cb_next(self, step=1):
836 if hasattr(self, 'progress_bar_gen'):
838 for i in xrange(step):
839 self.progress_bar_gen.next()
843 def _complete_cb(self):
846 self.progress_bar_gen.next()
850 def get_object_hashmap(
855 if_modified_since=None,
856 if_unmodified_since=None,
859 :param obj: (str) remote object path
861 :param if_match: (str)
863 :param if_none_match: (str)
865 :param if_modified_since: (str) formated date
867 :param if_unmodified_since: (str) formated date
869 :param data_range: (str) from-to where from and to are integers
870 denoting file positions in bytes
879 if_etag_match=if_match,
880 if_etag_not_match=if_none_match,
881 if_modified_since=if_modified_since,
882 if_unmodified_since=if_unmodified_since,
883 data_range=data_range)
884 except ClientError as err:
885 if err.status == 304 or err.status == 412:
890 def set_account_group(self, group, usernames):
894 :param usernames: (list)
896 self.account_post(update=True, groups={group: usernames})
898 def del_account_group(self, group):
902 self.account_post(update=True, groups={group: []})
904 def get_account_info(self, until=None):
906 :param until: (str) formated date
910 r = self.account_head(until=until)
911 if r.status_code == 401:
912 raise ClientError("No authorization", status=401)
915 def get_account_quota(self):
920 self.get_account_info(),
921 'X-Account-Policy-Quota',
924 def get_account_versioning(self):
929 self.get_account_info(),
930 'X-Account-Policy-Versioning',
933 def get_account_meta(self, until=None):
935 :meta until: (str) formated date
939 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
941 def get_account_group(self):
945 return filter_in(self.get_account_info(), 'X-Account-Group-')
947 def set_account_meta(self, metapairs):
949 :param metapairs: (dict) {key1:val1, key2:val2, ...}
951 assert(type(metapairs) is dict)
952 self.account_post(update=True, metadata=metapairs)
954 def del_account_meta(self, metakey):
956 :param metakey: (str) metadatum key
958 self.account_post(update=True, metadata={metakey: ''})
961 def set_account_quota(self, quota):
965 self.account_post(update=True, quota=quota)
968 def set_account_versioning(self, versioning):
970 "param versioning: (str)
972 self.account_post(update=True, versioning=versioning)
974 def list_containers(self):
978 r = self.account_get()
981 def del_container(self, until=None, delimiter=None):
983 :param until: (str) formated date
985 :param delimiter: (str) with / empty container
987 :raises ClientError: 404 Container does not exist
989 :raises ClientError: 409 Container is not empty
991 self._assert_container()
992 r = self.container_delete(
995 success=(204, 404, 409))
996 if r.status_code == 404:
998 'Container "%s" does not exist' % self.container,
1000 elif r.status_code == 409:
1002 'Container "%s" is not empty' % self.container,
1005 def get_container_versioning(self, container=None):
1007 :param container: (str)
1011 cnt_back_up = self.container
1013 self.container = container or cnt_back_up
1015 self.get_container_info(),
1016 'X-Container-Policy-Versioning')
1018 self.container = cnt_back_up
1020 def get_container_limit(self, container=None):
1022 :param container: (str)
1026 cnt_back_up = self.container
1028 self.container = container or cnt_back_up
1030 self.get_container_info(),
1031 'X-Container-Policy-Quota')
1033 self.container = cnt_back_up
1035 def get_container_info(self, until=None):
1037 :param until: (str) formated date
1041 :raises ClientError: 404 Container not found
1044 r = self.container_head(until=until)
1045 except ClientError as err:
1046 err.details.append('for container %s' % self.container)
1050 def get_container_meta(self, until=None):
1052 :param until: (str) formated date
1057 self.get_container_info(until=until),
1060 def get_container_object_meta(self, until=None):
1062 :param until: (str) formated date
1067 self.get_container_info(until=until),
1068 'X-Container-Object-Meta')
1070 def set_container_meta(self, metapairs):
1072 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1074 assert(type(metapairs) is dict)
1075 self.container_post(update=True, metadata=metapairs)
1077 def del_container_meta(self, metakey):
1079 :param metakey: (str) metadatum key
1081 self.container_post(update=True, metadata={metakey: ''})
1083 def set_container_limit(self, limit):
1087 self.container_post(update=True, quota=limit)
1089 def set_container_versioning(self, versioning):
1091 :param versioning: (str)
1093 self.container_post(update=True, versioning=versioning)
1095 def del_object(self, obj, until=None, delimiter=None):
1097 :param obj: (str) remote object path
1099 :param until: (str) formated date
1101 :param delimiter: (str)
1103 self._assert_container()
1104 self.object_delete(obj, until=until, delimiter=delimiter)
1106 def set_object_meta(self, obj, metapairs):
1108 :param obj: (str) remote object path
1110 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1112 assert(type(metapairs) is dict)
1113 self.object_post(obj, update=True, metadata=metapairs)
1115 def del_object_meta(self, obj, metakey):
1117 :param obj: (str) remote object path
1119 :param metakey: (str) metadatum key
1121 self.object_post(obj, update=True, metadata={metakey: ''})
1123 def publish_object(self, obj):
1125 :param obj: (str) remote object path
1127 :returns: (str) access url
1129 self.object_post(obj, update=True, public=True)
1130 info = self.get_object_info(obj)
1131 pref, sep, rest = self.base_url.partition('//')
1132 base = rest.split('/')[0]
1133 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1135 def unpublish_object(self, obj):
1137 :param obj: (str) remote object path
1139 self.object_post(obj, update=True, public=False)
1141 def get_object_info(self, obj, version=None):
1143 :param obj: (str) remote object path
1145 :param version: (str)
1150 r = self.object_head(obj, version=version)
1152 except ClientError as ce:
1153 if ce.status == 404:
1154 raise ClientError('Object %s not found' % obj, status=404)
1157 def get_object_meta(self, obj, version=None):
1159 :param obj: (str) remote object path
1161 :param version: (str)
1166 self.get_object_info(obj, version=version),
1169 def get_object_sharing(self, obj):
1171 :param obj: (str) remote object path
1176 self.get_object_info(obj),
1181 perms = r['x-object-sharing'].split(';')
1186 raise ClientError('Incorrect reply format')
1187 (key, val) = perm.strip().split('=')
1191 def set_object_sharing(
1193 read_permition=False, write_permition=False):
1194 """Give read/write permisions to an object.
1196 :param obj: (str) remote object path
1198 :param read_permition: (list - bool) users and user groups that get
1199 read permition for this object - False means all previous read
1200 permissions will be removed
1202 :param write_perimition: (list - bool) of users and user groups to get
1203 write permition for this object - False means all previous write
1204 permissions will be removed
1207 perms = dict(read=read_permition or '', write=write_permition or '')
1208 self.object_post(obj, update=True, permissions=perms)
1210 def del_object_sharing(self, obj):
1212 :param obj: (str) remote object path
1214 self.set_object_sharing(obj)
1216 def append_object(self, obj, source_file, upload_cb=None):
1218 :param obj: (str) remote object path
1220 :param source_file: open file descriptor
1222 :param upload_db: progress.bar for uploading
1225 self._assert_container()
1226 meta = self.get_container_info()
1227 blocksize = int(meta['x-container-block-size'])
1228 filesize = fstat(source_file.fileno()).st_size
1229 nblocks = 1 + (filesize - 1) // blocksize
1232 upload_gen = upload_cb(nblocks)
1234 for i in range(nblocks):
1235 block = source_file.read(min(blocksize, filesize - offset))
1236 offset += len(block)
1240 content_range='bytes */*',
1241 content_type='application/octet-stream',
1242 content_length=len(block),
1248 def truncate_object(self, obj, upto_bytes):
1250 :param obj: (str) remote object path
1252 :param upto_bytes: max number of bytes to leave on file
1257 content_range='bytes 0-%s/*' % upto_bytes,
1258 content_type='application/octet-stream',
1259 object_bytes=upto_bytes,
1260 source_object=path4url(self.container, obj))
1262 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1263 """Overwrite a part of an object from local source file
1265 :param obj: (str) remote object path
1267 :param start: (int) position in bytes to start overwriting from
1269 :param end: (int) position in bytes to stop overwriting at
1271 :param source_file: open file descriptor
1273 :param upload_db: progress.bar for uploading
1276 r = self.get_object_info(obj)
1277 rf_size = int(r['content-length'])
1278 if rf_size < int(start):
1280 'Range start exceeds file size',
1282 elif rf_size < int(end):
1284 'Range end exceeds file size',
1286 self._assert_container()
1287 meta = self.get_container_info()
1288 blocksize = int(meta['x-container-block-size'])
1289 filesize = fstat(source_file.fileno()).st_size
1290 datasize = int(end) - int(start) + 1
1291 nblocks = 1 + (datasize - 1) // blocksize
1294 upload_gen = upload_cb(nblocks)
1296 for i in range(nblocks):
1297 read_size = min(blocksize, filesize - offset, datasize - offset)
1298 block = source_file.read(read_size)
1302 content_type='application/octet-stream',
1303 content_length=len(block),
1304 content_range='bytes %s-%s/*' % (
1306 start + offset + len(block) - 1),
1308 offset += len(block)
1314 self, src_container, src_object, dst_container,
1316 source_version=None,
1317 source_account=None,
1322 :param src_container: (str) source container
1324 :param src_object: (str) source object path
1326 :param dst_container: (str) destination container
1328 :param dst_object: (str) destination object path
1330 :param source_version: (str) source object version
1332 :param source_account: (str) account to copy from
1334 :param public: (bool)
1336 :param content_type: (str)
1338 :param delimiter: (str)
1340 self._assert_account()
1341 self.container = dst_container
1342 src_path = path4url(src_container, src_object)
1344 dst_object or src_object,
1348 source_version=source_version,
1349 source_account=source_account,
1351 content_type=content_type,
1352 delimiter=delimiter)
1355 self, src_container, src_object, dst_container,
1357 source_account=None,
1358 source_version=None,
1363 :param src_container: (str) source container
1365 :param src_object: (str) source object path
1367 :param dst_container: (str) destination container
1369 :param dst_object: (str) destination object path
1371 :param source_account: (str) account to move from
1373 :param source_version: (str) source object version
1375 :param public: (bool)
1377 :param content_type: (str)
1379 :param delimiter: (str)
1381 self._assert_account()
1382 self.container = dst_container
1383 dst_object = dst_object or src_object
1384 src_path = path4url(src_container, src_object)
1390 source_account=source_account,
1391 source_version=source_version,
1393 content_type=content_type,
1394 delimiter=delimiter)
1396 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1397 """Get accounts that share with self.account
1401 :param marker: (str)
1405 self._assert_account()
1407 self.set_param('format', 'json')
1408 self.set_param('limit', limit, iff=limit is not None)
1409 self.set_param('marker', marker, iff=marker is not None)
1412 success = kwargs.pop('success', (200, 204))
1413 r = self.get(path, *args, success=success, **kwargs)
1416 def get_object_versionlist(self, obj):
1418 :param obj: (str) remote object path
1422 self._assert_container()
1423 r = self.object_get(obj, format='json', version='list')
1424 return r.json['versions']