1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
76 container=None, sizelimit=None, versioning=None, metadata=None):
78 :param container: (str) if not given, self.container is used instead
80 :param sizelimit: (int) container total size limit in bytes
82 :param versioning: (str) can be auto or whatever supported by server
84 :param metadata: (dict) Custom user-defined metadata of the form
85 { 'name1': 'value1', 'name2': 'value2', ... }
87 :returns: (dict) response headers
89 cnt_back_up = self.container
91 self.container = container or cnt_back_up
92 r = self.container_put(
93 quota=sizelimit, versioning=versioning, metadata=metadata)
96 self.container = cnt_back_up
98 def purge_container(self, container=None):
99 """Delete an empty container and destroy associated blocks
101 cnt_back_up = self.container
103 self.container = container or cnt_back_up
104 r = self.container_delete(until=unicode(time()))
106 self.container = cnt_back_up
109 def upload_object_unchunked(
114 content_encoding=None,
115 content_disposition=None,
120 :param obj: (str) remote object path
122 :param f: open file descriptor
124 :param withHashFile: (bool)
126 :param size: (int) size of data to upload
130 :param content_encoding: (str)
132 :param content_disposition: (str)
134 :param content_type: (str)
136 :param sharing: {'read':[user and/or grp names],
137 'write':[usr and/or grp names]}
139 :param public: (bool)
141 :returns: (dict) created object metadata
143 self._assert_container()
149 data = json.dumps(json.loads(data))
151 raise ClientError('"%s" is not json-formated' % f.name, 1)
153 msg = '"%s" is not a valid hashmap file' % f.name
154 raise ClientError(msg, 1)
157 data = f.read(size) if size else f.read()
162 content_encoding=content_encoding,
163 content_disposition=content_disposition,
164 content_type=content_type,
170 def create_object_by_manifestation(
173 content_encoding=None,
174 content_disposition=None,
179 :param obj: (str) remote object path
183 :param content_encoding: (str)
185 :param content_disposition: (str)
187 :param content_type: (str)
189 :param sharing: {'read':[user and/or grp names],
190 'write':[usr and/or grp names]}
192 :param public: (bool)
194 :returns: (dict) created object metadata
196 self._assert_container()
201 content_encoding=content_encoding,
202 content_disposition=content_disposition,
203 content_type=content_type,
206 manifest='%s/%s' % (self.container, obj))
209 # upload_* auxiliary methods
210 def _put_block_async(self, data, hash):
211 event = SilentEvent(method=self._put_block, data=data, hash=hash)
215 def _put_block(self, data, hash):
216 r = self.container_post(
218 content_type='application/octet-stream',
219 content_length=len(data),
222 assert r.json[0] == hash, 'Local hash does not match server'
224 def _get_file_block_info(self, fileobj, size=None, cache=None):
226 :param fileobj: (file descriptor) source
228 :param size: (int) size of data to upload from source
230 :param cache: (dict) if provided, cache container info response to
231 avoid redundant calls
233 if isinstance(cache, dict):
235 meta = cache[self.container]
237 meta = self.get_container_info()
238 cache[self.container] = meta
240 meta = self.get_container_info()
241 blocksize = int(meta['x-container-block-size'])
242 blockhash = meta['x-container-block-hash']
243 size = size if size is not None else fstat(fileobj.fileno()).st_size
244 nblocks = 1 + (size - 1) // blocksize
245 return (blocksize, blockhash, size, nblocks)
247 def _create_object_or_get_missing_hashes(
254 if_etag_not_match=None,
255 content_encoding=None,
256 content_disposition=None,
264 content_type=content_type,
266 if_etag_match=if_etag_match,
267 if_etag_not_match=if_etag_not_match,
268 content_encoding=content_encoding,
269 content_disposition=content_disposition,
270 permissions=permissions,
273 return (None if r.status_code == 201 else r.json), r.headers
275 def _calculate_blocks_for_upload(
276 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
280 hash_gen = hash_cb(nblocks)
283 for i in range(nblocks):
284 block = fileobj.read(min(blocksize, size - offset))
286 hash = _pithos_hash(block, blockhash)
288 hmap[hash] = (offset, bytes)
292 msg = 'Failed to calculate uploaded blocks:'
293 ' Offset and object size do not match'
294 assert offset == size, msg
296 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297 """upload missing blocks asynchronously"""
299 self._init_thread_limit()
304 offset, bytes = hmap[hash]
306 data = fileobj.read(bytes)
307 r = self._put_block_async(data, hash)
309 unfinished = self._watch_thread_limit(flying)
310 for thread in set(flying).difference(unfinished):
312 failures.append(thread)
315 ClientError) and thread.exception.status == 502:
316 self.POOLSIZE = self._thread_limit
317 elif thread.isAlive():
318 flying.append(thread)
326 for thread in flying:
329 failures.append(thread)
336 return [failure.kwargs['hash'] for failure in failures]
346 content_encoding=None,
347 content_disposition=None,
351 container_info_cache=None):
352 """Upload an object using multiple connections (threads)
354 :param obj: (str) remote object path
356 :param f: open file descriptor (rb)
358 :param hash_cb: optional progress.bar object for calculating hashes
360 :param upload_cb: optional progress.bar object for uploading
364 :param if_etag_match: (str) Push that value to if-match header at file
367 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368 it does not exist remotely, otherwise the operation will fail.
369 Involves the case of an object with the same path is created while
370 the object is being uploaded.
372 :param content_encoding: (str)
374 :param content_disposition: (str)
376 :param content_type: (str)
378 :param sharing: {'read':[user and/or grp names],
379 'write':[usr and/or grp names]}
381 :param public: (bool)
383 :param container_info_cache: (dict) if given, avoid redundant calls to
384 server for container info (block size and hash information)
386 self._assert_container()
389 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390 f, size, container_info_cache)
391 (hashes, hmap, offset) = ([], {}, 0)
393 content_type = 'application/octet-stream'
395 self._calculate_blocks_for_upload(
402 hashmap = dict(bytes=size, hashes=hashes)
403 missing, obj_headers = self._create_object_or_get_missing_hashes(
405 content_type=content_type,
407 if_etag_match=if_etag_match,
408 if_etag_not_match='*' if if_not_exist else None,
409 content_encoding=content_encoding,
410 content_disposition=content_disposition,
418 upload_gen = upload_cb(len(missing))
419 for i in range(len(missing), len(hashmap['hashes']) + 1):
430 sendlog.info('%s blocks missing' % len(missing))
431 num_of_blocks = len(missing)
432 missing = self._upload_missing_blocks(
438 if num_of_blocks == len(missing):
441 num_of_blocks = len(missing)
446 '%s blocks failed to upload' % len(missing),
447 details=['%s' % thread.exception for thread in missing])
448 except KeyboardInterrupt:
449 sendlog.info('- - - wait for threads to finish')
450 for thread in activethreads():
458 content_type=content_type,
459 if_etag_match=if_etag_match,
460 if_etag_not_match='*' if if_not_exist else None,
468 def upload_from_string(
469 self, obj, input_str,
475 content_encoding=None,
476 content_disposition=None,
480 container_info_cache=None):
481 """Upload an object using multiple connections (threads)
483 :param obj: (str) remote object path
485 :param input_str: (str) upload content
487 :param hash_cb: optional progress.bar object for calculating hashes
489 :param upload_cb: optional progress.bar object for uploading
493 :param if_etag_match: (str) Push that value to if-match header at file
496 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497 it does not exist remotely, otherwise the operation will fail.
498 Involves the case of an object with the same path is created while
499 the object is being uploaded.
501 :param content_encoding: (str)
503 :param content_disposition: (str)
505 :param content_type: (str)
507 :param sharing: {'read':[user and/or grp names],
508 'write':[usr and/or grp names]}
510 :param public: (bool)
512 :param container_info_cache: (dict) if given, avoid redundant calls to
513 server for container info (block size and hash information)
515 self._assert_container()
517 blocksize, blockhash, size, nblocks = self._get_file_block_info(
518 fileobj=None, size=len(input_str), cache=container_info_cache)
519 (hashes, hmap, offset) = ([], {}, 0)
521 content_type = 'application/octet-stream'
525 for blockid in range(nblocks):
526 start = blockid * blocksize
527 block = input_str[start: (start + blocksize)]
528 hashes.append(_pithos_hash(block, blockhash))
529 hmap[hashes[blockid]] = (start, block)
531 hashmap = dict(bytes=size, hashes=hashes)
532 missing, obj_headers = self._create_object_or_get_missing_hashes(
534 content_type=content_type,
536 if_etag_match=if_etag_match,
537 if_etag_not_match='*' if if_not_exist else None,
538 content_encoding=content_encoding,
539 content_disposition=content_disposition,
544 num_of_missing = len(missing)
547 self.progress_bar_gen = upload_cb(nblocks)
548 for i in range(nblocks + 1 - num_of_missing):
554 while tries and missing:
558 offset, block = hmap[hash]
559 bird = self._put_block_async(block, hash)
561 unfinished = self._watch_thread_limit(flying)
562 for thread in set(flying).difference(unfinished):
564 failures.append(thread.kwargs['hash'])
566 flying.append(thread)
570 for thread in flying:
573 failures.append(thread.kwargs['hash'])
576 if missing and len(missing) == old_failures:
578 old_failures = len(missing)
581 '%s blocks failed to upload' % len(missing),
582 details=['%s' % thread.exception for thread in missing])
583 except KeyboardInterrupt:
584 sendlog.info('- - - wait for threads to finish')
585 for thread in activethreads():
593 content_type=content_type,
594 if_etag_match=if_etag_match,
595 if_etag_not_match='*' if if_not_exist else None,
603 # download_* auxiliary methods
604 def _get_remote_blocks_info(self, obj, **restargs):
605 #retrieve object hashmap
606 myrange = restargs.pop('data_range', None)
607 hashmap = self.get_object_hashmap(obj, **restargs)
608 restargs['data_range'] = myrange
609 blocksize = int(hashmap['block_size'])
610 blockhash = hashmap['block_hash']
611 total_size = hashmap['bytes']
612 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
614 for i, h in enumerate(hashmap['hashes']):
615 # map_dict[h] = i CHAGE
617 map_dict[h].append(i)
620 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
622 def _dump_blocks_sync(
623 self, obj, remote_hashes, blocksize, total_size, dst, range,
625 for blockid, blockhash in enumerate(remote_hashes):
627 start = blocksize * blockid
628 is_last = start + blocksize > total_size
629 end = (total_size - 1) if is_last else (start + blocksize - 1)
630 (start, end) = _range_up(start, end, range)
631 args['data_range'] = 'bytes=%s-%s' % (start, end)
632 r = self.object_get(obj, success=(200, 206), **args)
637 def _get_block_async(self, obj, **args):
638 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
642 def _hash_from_file(self, fp, start, size, blockhash):
644 block = fp.read(size)
645 h = newhashlib(blockhash)
646 h.update(block.strip('\x00'))
647 return hexlify(h.digest())
649 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
650 """write the results of a greenleted rest call to a file
652 :param offset: the offset of the file up to blocksize
653 - e.g. if the range is 10-100, all blocks will be written to
656 for key, g in flying.items():
661 block = g.value.content
662 for block_start in blockids[key]:
663 local_file.seek(block_start + offset)
664 local_file.write(block)
670 def _dump_blocks_async(
671 self, obj, remote_hashes, blocksize, total_size, local_file,
672 blockhash=None, resume=False, filerange=None, **restargs):
673 file_size = fstat(local_file.fileno()).st_size if resume else 0
675 blockid_dict = dict()
677 if filerange is not None:
678 rstart = int(filerange.split('-')[0])
679 offset = rstart if blocksize > rstart else rstart % blocksize
681 self._init_thread_limit()
682 for block_hash, blockids in remote_hashes.items():
683 blockids = [blk * blocksize for blk in blockids]
684 unsaved = [blk for blk in blockids if not (
685 blk < file_size and block_hash == self._hash_from_file(
686 local_file, blk, blocksize, blockhash))]
687 self._cb_next(len(blockids) - len(unsaved))
690 self._watch_thread_limit(flying.values())
692 flying, blockid_dict, local_file, offset,
694 end = total_size - 1 if (
695 key + blocksize > total_size) else key + blocksize - 1
696 start, end = _range_up(key, end, filerange)
700 restargs['async_headers'] = {
701 'Range': 'bytes=%s-%s' % (start, end)}
702 flying[key] = self._get_block_async(obj, **restargs)
703 blockid_dict[key] = unsaved
705 for thread in flying.values():
707 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
717 if_modified_since=None,
718 if_unmodified_since=None):
719 """Download an object (multiple connections, random blocks)
721 :param obj: (str) remote object path
723 :param dst: open file descriptor (wb+)
725 :param download_cb: optional progress.bar object for downloading
727 :param version: (str) file version
729 :param resume: (bool) if set, preserve already downloaded file parts
731 :param range_str: (str) from, to are file positions (int) in bytes
733 :param if_match: (str)
735 :param if_none_match: (str)
737 :param if_modified_since: (str) formated date
739 :param if_unmodified_since: (str) formated date"""
742 data_range=None if range_str is None else 'bytes=%s' % range_str,
744 if_none_match=if_none_match,
745 if_modified_since=if_modified_since,
746 if_unmodified_since=if_unmodified_since)
753 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
754 assert total_size >= 0
757 self.progress_bar_gen = download_cb(len(hash_list))
761 self._dump_blocks_sync(
770 self._dump_blocks_async(
781 dst.truncate(total_size)
785 def download_to_string(
792 if_modified_since=None,
793 if_unmodified_since=None):
794 """Download an object to a string (multiple connections). This method
795 uses threads for http requests, but stores all content in memory.
797 :param obj: (str) remote object path
799 :param download_cb: optional progress.bar object for downloading
801 :param version: (str) file version
803 :param range_str: (str) from, to are file positions (int) in bytes
805 :param if_match: (str)
807 :param if_none_match: (str)
809 :param if_modified_since: (str) formated date
811 :param if_unmodified_since: (str) formated date
813 :returns: (str) the whole object contents
817 data_range=None if range_str is None else 'bytes=%s' % range_str,
819 if_none_match=if_none_match,
820 if_modified_since=if_modified_since,
821 if_unmodified_since=if_unmodified_since)
828 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
829 assert total_size >= 0
832 self.progress_bar_gen = download_cb(len(hash_list))
835 num_of_blocks = len(remote_hashes)
836 ret = [''] * num_of_blocks
837 self._init_thread_limit()
840 for blockid, blockhash in enumerate(remote_hashes):
841 start = blocksize * blockid
842 is_last = start + blocksize > total_size
843 end = (total_size - 1) if is_last else (start + blocksize - 1)
844 (start, end) = _range_up(start, end, range_str)
846 self._watch_thread_limit(flying.values())
847 flying[blockid] = self._get_block_async(obj, **restargs)
848 for runid, thread in flying.items():
849 if (blockid + 1) == num_of_blocks:
851 elif thread.isAlive():
854 raise thread.exception
855 ret[runid] = thread.value.content
859 except KeyboardInterrupt:
860 sendlog.info('- - - wait for threads to finish')
861 for thread in activethreads():
864 #Command Progress Bar method
865 def _cb_next(self, step=1):
866 if hasattr(self, 'progress_bar_gen'):
868 for i in xrange(step):
869 self.progress_bar_gen.next()
873 def _complete_cb(self):
876 self.progress_bar_gen.next()
880 def get_object_hashmap(
885 if_modified_since=None,
886 if_unmodified_since=None,
889 :param obj: (str) remote object path
891 :param if_match: (str)
893 :param if_none_match: (str)
895 :param if_modified_since: (str) formated date
897 :param if_unmodified_since: (str) formated date
899 :param data_range: (str) from-to where from and to are integers
900 denoting file positions in bytes
909 if_etag_match=if_match,
910 if_etag_not_match=if_none_match,
911 if_modified_since=if_modified_since,
912 if_unmodified_since=if_unmodified_since,
913 data_range=data_range)
914 except ClientError as err:
915 if err.status == 304 or err.status == 412:
920 def set_account_group(self, group, usernames):
924 :param usernames: (list)
926 r = self.account_post(update=True, groups={group: usernames})
929 def del_account_group(self, group):
933 self.account_post(update=True, groups={group: []})
935 def get_account_info(self, until=None):
937 :param until: (str) formated date
941 r = self.account_head(until=until)
942 if r.status_code == 401:
943 raise ClientError("No authorization", status=401)
946 def get_account_quota(self):
951 self.get_account_info(),
952 'X-Account-Policy-Quota',
955 def get_account_versioning(self):
960 self.get_account_info(),
961 'X-Account-Policy-Versioning',
964 def get_account_meta(self, until=None):
966 :param until: (str) formated date
970 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
972 def get_account_group(self):
976 return filter_in(self.get_account_info(), 'X-Account-Group-')
978 def set_account_meta(self, metapairs):
980 :param metapairs: (dict) {key1:val1, key2:val2, ...}
982 assert(type(metapairs) is dict)
983 r = self.account_post(update=True, metadata=metapairs)
986 def del_account_meta(self, metakey):
988 :param metakey: (str) metadatum key
990 r = self.account_post(update=True, metadata={metakey: ''})
994 def set_account_quota(self, quota):
998 self.account_post(update=True, quota=quota)
1001 def set_account_versioning(self, versioning):
1003 "param versioning: (str)
1005 r = self.account_post(update=True, versioning=versioning)
1008 def list_containers(self):
1012 r = self.account_get()
1015 def del_container(self, until=None, delimiter=None):
1017 :param until: (str) formated date
1019 :param delimiter: (str) with / empty container
1021 :raises ClientError: 404 Container does not exist
1023 :raises ClientError: 409 Container is not empty
1025 self._assert_container()
1026 r = self.container_delete(
1028 delimiter=delimiter,
1029 success=(204, 404, 409))
1030 if r.status_code == 404:
1032 'Container "%s" does not exist' % self.container,
1034 elif r.status_code == 409:
1036 'Container "%s" is not empty' % self.container,
1040 def get_container_versioning(self, container=None):
1042 :param container: (str)
1046 cnt_back_up = self.container
1048 self.container = container or cnt_back_up
1050 self.get_container_info(),
1051 'X-Container-Policy-Versioning')
1053 self.container = cnt_back_up
1055 def get_container_limit(self, container=None):
1057 :param container: (str)
1061 cnt_back_up = self.container
1063 self.container = container or cnt_back_up
1065 self.get_container_info(),
1066 'X-Container-Policy-Quota')
1068 self.container = cnt_back_up
1070 def get_container_info(self, until=None):
1072 :param until: (str) formated date
1076 :raises ClientError: 404 Container not found
1079 r = self.container_head(until=until)
1080 except ClientError as err:
1081 err.details.append('for container %s' % self.container)
1085 def get_container_meta(self, until=None):
1087 :param until: (str) formated date
1092 self.get_container_info(until=until),
1095 def get_container_object_meta(self, until=None):
1097 :param until: (str) formated date
1102 self.get_container_info(until=until),
1103 'X-Container-Object-Meta')
1105 def set_container_meta(self, metapairs):
1107 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1109 assert(type(metapairs) is dict)
1110 r = self.container_post(update=True, metadata=metapairs)
1113 def del_container_meta(self, metakey):
1115 :param metakey: (str) metadatum key
1117 :returns: (dict) response headers
1119 r = self.container_post(update=True, metadata={metakey: ''})
1122 def set_container_limit(self, limit):
1126 r = self.container_post(update=True, quota=limit)
1129 def set_container_versioning(self, versioning):
1131 :param versioning: (str)
1133 r = self.container_post(update=True, versioning=versioning)
1136 def del_object(self, obj, until=None, delimiter=None):
1138 :param obj: (str) remote object path
1140 :param until: (str) formated date
1142 :param delimiter: (str)
1144 self._assert_container()
1145 r = self.object_delete(obj, until=until, delimiter=delimiter)
1148 def set_object_meta(self, obj, metapairs):
1150 :param obj: (str) remote object path
1152 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1154 assert(type(metapairs) is dict)
1155 r = self.object_post(obj, update=True, metadata=metapairs)
1158 def del_object_meta(self, obj, metakey):
1160 :param obj: (str) remote object path
1162 :param metakey: (str) metadatum key
1164 r = self.object_post(obj, update=True, metadata={metakey: ''})
1167 def publish_object(self, obj):
1169 :param obj: (str) remote object path
1171 :returns: (str) access url
1173 self.object_post(obj, update=True, public=True)
1174 info = self.get_object_info(obj)
1175 pref, sep, rest = self.base_url.partition('//')
1176 base = rest.split('/')[0]
1177 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1179 def unpublish_object(self, obj):
1181 :param obj: (str) remote object path
1183 r = self.object_post(obj, update=True, public=False)
1186 def get_object_info(self, obj, version=None):
1188 :param obj: (str) remote object path
1190 :param version: (str)
1195 r = self.object_head(obj, version=version)
1197 except ClientError as ce:
1198 if ce.status == 404:
1199 raise ClientError('Object %s not found' % obj, status=404)
1202 def get_object_meta(self, obj, version=None):
1204 :param obj: (str) remote object path
1206 :param version: (str)
1211 self.get_object_info(obj, version=version),
1214 def get_object_sharing(self, obj):
1216 :param obj: (str) remote object path
1221 self.get_object_info(obj),
1226 perms = r['x-object-sharing'].split(';')
1231 raise ClientError('Incorrect reply format')
1232 (key, val) = perm.strip().split('=')
1236 def set_object_sharing(
1238 read_permission=False, write_permission=False):
1239 """Give read/write permisions to an object.
1241 :param obj: (str) remote object path
1243 :param read_permission: (list - bool) users and user groups that get
1244 read permission for this object - False means all previous read
1245 permissions will be removed
1247 :param write_permission: (list - bool) of users and user groups to get
1248 write permission for this object - False means all previous write
1249 permissions will be removed
1251 :returns: (dict) response headers
1254 perms = dict(read=read_permission or '', write=write_permission or '')
1255 r = self.object_post(obj, update=True, permissions=perms)
1258 def del_object_sharing(self, obj):
1260 :param obj: (str) remote object path
1262 return self.set_object_sharing(obj)
1264 def append_object(self, obj, source_file, upload_cb=None):
1266 :param obj: (str) remote object path
1268 :param source_file: open file descriptor
1270 :param upload_db: progress.bar for uploading
1272 self._assert_container()
1273 meta = self.get_container_info()
1274 blocksize = int(meta['x-container-block-size'])
1275 filesize = fstat(source_file.fileno()).st_size
1276 nblocks = 1 + (filesize - 1) // blocksize
1280 self.progress_bar_gen = upload_cb(nblocks)
1283 self._init_thread_limit()
1285 for i in range(nblocks):
1286 block = source_file.read(min(blocksize, filesize - offset))
1287 offset += len(block)
1289 self._watch_thread_limit(flying.values())
1291 flying[i] = SilentEvent(
1292 method=self.object_post,
1295 content_range='bytes */*',
1296 content_type='application/octet-stream',
1297 content_length=len(block),
1301 for key, thread in flying.items():
1302 if thread.isAlive():
1304 unfinished[key] = thread
1307 if thread.exception:
1308 raise thread.exception
1309 headers[key] = thread.value.headers
1312 except KeyboardInterrupt:
1313 sendlog.info('- - - wait for threads to finish')
1314 for thread in activethreads():
1317 from time import sleep
1318 sleep(2 * len(activethreads()))
1319 return headers.values()
1321 def truncate_object(self, obj, upto_bytes):
1323 :param obj: (str) remote object path
1325 :param upto_bytes: max number of bytes to leave on file
1327 :returns: (dict) response headers
1329 r = self.object_post(
1332 content_range='bytes 0-%s/*' % upto_bytes,
1333 content_type='application/octet-stream',
1334 object_bytes=upto_bytes,
1335 source_object=path4url(self.container, obj))
1338 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1339 """Overwrite a part of an object from local source file
1341 :param obj: (str) remote object path
1343 :param start: (int) position in bytes to start overwriting from
1345 :param end: (int) position in bytes to stop overwriting at
1347 :param source_file: open file descriptor
1349 :param upload_db: progress.bar for uploading
1352 r = self.get_object_info(obj)
1353 rf_size = int(r['content-length'])
1354 if rf_size < int(start):
1356 'Range start exceeds file size',
1358 elif rf_size < int(end):
1360 'Range end exceeds file size',
1362 self._assert_container()
1363 meta = self.get_container_info()
1364 blocksize = int(meta['x-container-block-size'])
1365 filesize = fstat(source_file.fileno()).st_size
1366 datasize = int(end) - int(start) + 1
1367 nblocks = 1 + (datasize - 1) // blocksize
1370 self.progress_bar_gen = upload_cb(nblocks)
1373 for i in range(nblocks):
1374 read_size = min(blocksize, filesize - offset, datasize - offset)
1375 block = source_file.read(read_size)
1376 r = self.object_post(
1379 content_type='application/octet-stream',
1380 content_length=len(block),
1381 content_range='bytes %s-%s/*' % (
1383 start + offset + len(block) - 1),
1385 headers.append(dict(r.headers))
1386 offset += len(block)
1392 self, src_container, src_object, dst_container,
1394 source_version=None,
1395 source_account=None,
1400 :param src_container: (str) source container
1402 :param src_object: (str) source object path
1404 :param dst_container: (str) destination container
1406 :param dst_object: (str) destination object path
1408 :param source_version: (str) source object version
1410 :param source_account: (str) account to copy from
1412 :param public: (bool)
1414 :param content_type: (str)
1416 :param delimiter: (str)
1418 :returns: (dict) response headers
1420 self._assert_account()
1421 self.container = dst_container
1422 src_path = path4url(src_container, src_object)
1423 r = self.object_put(
1424 dst_object or src_object,
1428 source_version=source_version,
1429 source_account=source_account,
1431 content_type=content_type,
1432 delimiter=delimiter)
1436 self, src_container, src_object, dst_container,
1438 source_account=None,
1439 source_version=None,
1444 :param src_container: (str) source container
1446 :param src_object: (str) source object path
1448 :param dst_container: (str) destination container
1450 :param dst_object: (str) destination object path
1452 :param source_account: (str) account to move from
1454 :param source_version: (str) source object version
1456 :param public: (bool)
1458 :param content_type: (str)
1460 :param delimiter: (str)
1462 :returns: (dict) response headers
1464 self._assert_account()
1465 self.container = dst_container
1466 dst_object = dst_object or src_object
1467 src_path = path4url(src_container, src_object)
1468 r = self.object_put(
1473 source_account=source_account,
1474 source_version=source_version,
1476 content_type=content_type,
1477 delimiter=delimiter)
1480 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1481 """Get accounts that share with self.account
1485 :param marker: (str)
1489 self._assert_account()
1491 self.set_param('format', 'json')
1492 self.set_param('limit', limit, iff=limit is not None)
1493 self.set_param('marker', marker, iff=marker is not None)
1496 success = kwargs.pop('success', (200, 204))
1497 r = self.get(path, *args, success=success, **kwargs)
1500 def get_object_versionlist(self, obj):
1502 :param obj: (str) remote object path
1506 self._assert_container()
1507 r = self.object_get(obj, format='json', version='list')
1508 return r.json['versions']