1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
76 container=None, sizelimit=None, versioning=None, metadata=None):
78 :param container: (str) if not given, self.container is used instead
80 :param sizelimit: (int) container total size limit in bytes
82 :param versioning: (str) can be auto or whatever supported by server
84 :param metadata: (dict) Custom user-defined metadata of the form
85 { 'name1': 'value1', 'name2': 'value2', ... }
87 :returns: (dict) response headers
89 cnt_back_up = self.container
91 self.container = container or cnt_back_up
92 r = self.container_put(
93 quota=sizelimit, versioning=versioning, metadata=metadata)
96 self.container = cnt_back_up
98 def purge_container(self, container=None):
99 """Delete an empty container and destroy associated blocks
101 cnt_back_up = self.container
103 self.container = container or cnt_back_up
104 r = self.container_delete(until=unicode(time()))
106 self.container = cnt_back_up
109 def upload_object_unchunked(
114 content_encoding=None,
115 content_disposition=None,
120 :param obj: (str) remote object path
122 :param f: open file descriptor
124 :param withHashFile: (bool)
126 :param size: (int) size of data to upload
130 :param content_encoding: (str)
132 :param content_disposition: (str)
134 :param content_type: (str)
136 :param sharing: {'read':[user and/or grp names],
137 'write':[usr and/or grp names]}
139 :param public: (bool)
141 :returns: (dict) created object metadata
143 self._assert_container()
149 data = json.dumps(json.loads(data))
151 raise ClientError('"%s" is not json-formated' % f.name, 1)
153 msg = '"%s" is not a valid hashmap file' % f.name
154 raise ClientError(msg, 1)
157 data = f.read(size) if size else f.read()
162 content_encoding=content_encoding,
163 content_disposition=content_disposition,
164 content_type=content_type,
170 def create_object_by_manifestation(
173 content_encoding=None,
174 content_disposition=None,
179 :param obj: (str) remote object path
183 :param content_encoding: (str)
185 :param content_disposition: (str)
187 :param content_type: (str)
189 :param sharing: {'read':[user and/or grp names],
190 'write':[usr and/or grp names]}
192 :param public: (bool)
194 :returns: (dict) created object metadata
196 self._assert_container()
201 content_encoding=content_encoding,
202 content_disposition=content_disposition,
203 content_type=content_type,
206 manifest='%s/%s' % (self.container, obj))
209 # upload_* auxiliary methods
210 def _put_block_async(self, data, hash):
211 event = SilentEvent(method=self._put_block, data=data, hash=hash)
215 def _put_block(self, data, hash):
216 r = self.container_post(
218 content_type='application/octet-stream',
219 content_length=len(data),
222 assert r.json[0] == hash, 'Local hash does not match server'
224 def _get_file_block_info(self, fileobj, size=None, cache=None):
226 :param fileobj: (file descriptor) source
228 :param size: (int) size of data to upload from source
230 :param cache: (dict) if provided, cache container info response to
231 avoid redundant calls
233 if isinstance(cache, dict):
235 meta = cache[self.container]
237 meta = self.get_container_info()
238 cache[self.container] = meta
240 meta = self.get_container_info()
241 blocksize = int(meta['x-container-block-size'])
242 blockhash = meta['x-container-block-hash']
243 size = size if size is not None else fstat(fileobj.fileno()).st_size
244 nblocks = 1 + (size - 1) // blocksize
245 return (blocksize, blockhash, size, nblocks)
247 def _create_object_or_get_missing_hashes(
254 if_etag_not_match=None,
255 content_encoding=None,
256 content_disposition=None,
264 content_type=content_type,
266 if_etag_match=if_etag_match,
267 if_etag_not_match=if_etag_not_match,
268 content_encoding=content_encoding,
269 content_disposition=content_disposition,
270 permissions=permissions,
273 return (None if r.status_code == 201 else r.json), r.headers
275 def _calculate_blocks_for_upload(
276 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
280 hash_gen = hash_cb(nblocks)
283 for i in range(nblocks):
284 block = fileobj.read(min(blocksize, size - offset))
286 hash = _pithos_hash(block, blockhash)
288 hmap[hash] = (offset, bytes)
292 msg = 'Failed to calculate uploaded blocks:'
293 ' Offset and object size do not match'
294 assert offset == size, msg
296 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297 """upload missing blocks asynchronously"""
299 self._init_thread_limit()
304 offset, bytes = hmap[hash]
306 data = fileobj.read(bytes)
307 r = self._put_block_async(data, hash)
309 unfinished = self._watch_thread_limit(flying)
310 for thread in set(flying).difference(unfinished):
312 failures.append(thread)
315 ClientError) and thread.exception.status == 502:
316 self.POOLSIZE = self._thread_limit
317 elif thread.isAlive():
318 flying.append(thread)
326 for thread in flying:
329 failures.append(thread)
336 return [failure.kwargs['hash'] for failure in failures]
346 content_encoding=None,
347 content_disposition=None,
351 container_info_cache=None):
352 """Upload an object using multiple connections (threads)
354 :param obj: (str) remote object path
356 :param f: open file descriptor (rb)
358 :param hash_cb: optional progress.bar object for calculating hashes
360 :param upload_cb: optional progress.bar object for uploading
364 :param if_etag_match: (str) Push that value to if-match header at file
367 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368 it does not exist remotely, otherwise the operation will fail.
369 Involves the case of an object with the same path is created while
370 the object is being uploaded.
372 :param content_encoding: (str)
374 :param content_disposition: (str)
376 :param content_type: (str)
378 :param sharing: {'read':[user and/or grp names],
379 'write':[usr and/or grp names]}
381 :param public: (bool)
383 :param container_info_cache: (dict) if given, avoid redundant calls to
384 server for container info (block size and hash information)
386 self._assert_container()
389 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390 f, size, container_info_cache)
391 (hashes, hmap, offset) = ([], {}, 0)
393 content_type = 'application/octet-stream'
395 self._calculate_blocks_for_upload(
402 hashmap = dict(bytes=size, hashes=hashes)
403 missing, obj_headers = self._create_object_or_get_missing_hashes(
405 content_type=content_type,
407 if_etag_match=if_etag_match,
408 if_etag_not_match='*' if if_not_exist else None,
409 content_encoding=content_encoding,
410 content_disposition=content_disposition,
418 upload_gen = upload_cb(len(missing))
419 for i in range(len(missing), len(hashmap['hashes']) + 1):
430 sendlog.info('%s blocks missing' % len(missing))
431 num_of_blocks = len(missing)
432 missing = self._upload_missing_blocks(
438 if num_of_blocks == len(missing):
441 num_of_blocks = len(missing)
446 '%s blocks failed to upload' % len(missing),
447 details=['%s' % thread.exception for thread in missing])
448 except KeyboardInterrupt:
449 sendlog.info('- - - wait for threads to finish')
450 for thread in activethreads():
458 content_type=content_type,
459 if_etag_match=if_etag_match,
460 if_etag_not_match='*' if if_not_exist else None,
468 def upload_from_string(
469 self, obj, input_str,
475 content_encoding=None,
476 content_disposition=None,
480 container_info_cache=None):
481 """Upload an object using multiple connections (threads)
483 :param obj: (str) remote object path
485 :param input_str: (str) upload content
487 :param hash_cb: optional progress.bar object for calculating hashes
489 :param upload_cb: optional progress.bar object for uploading
493 :param if_etag_match: (str) Push that value to if-match header at file
496 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497 it does not exist remotely, otherwise the operation will fail.
498 Involves the case of an object with the same path is created while
499 the object is being uploaded.
501 :param content_encoding: (str)
503 :param content_disposition: (str)
505 :param content_type: (str)
507 :param sharing: {'read':[user and/or grp names],
508 'write':[usr and/or grp names]}
510 :param public: (bool)
512 :param container_info_cache: (dict) if given, avoid redundant calls to
513 server for container info (block size and hash information)
515 self._assert_container()
517 blocksize, blockhash, size, nblocks = self._get_file_block_info(
518 fileobj=None, size=len(input_str), cache=container_info_cache)
519 (hashes, hmap, offset) = ([], {}, 0)
521 content_type = 'application/octet-stream'
525 for blockid in range(nblocks):
526 start = blockid * blocksize
527 block = input_str[start: (start + blocksize)]
528 hashes.append(_pithos_hash(block, blockhash))
529 hmap[hashes[blockid]] = (start, block)
531 hashmap = dict(bytes=size, hashes=hashes)
532 missing, obj_headers = self._create_object_or_get_missing_hashes(
534 content_type=content_type,
536 if_etag_match=if_etag_match,
537 if_etag_not_match='*' if if_not_exist else None,
538 content_encoding=content_encoding,
539 content_disposition=content_disposition,
544 num_of_missing = len(missing)
547 self.progress_bar_gen = upload_cb(nblocks)
548 for i in range(nblocks + 1 - num_of_missing):
554 while tries and missing:
558 offset, block = hmap[hash]
559 bird = self._put_block_async(block, hash)
561 unfinished = self._watch_thread_limit(flying)
562 for thread in set(flying).difference(unfinished):
564 failures.append(thread.kwargs['hash'])
566 flying.append(thread)
570 for thread in flying:
573 failures.append(thread.kwargs['hash'])
576 if missing and len(missing) == old_failures:
578 old_failures = len(missing)
581 '%s blocks failed to upload' % len(missing),
582 details=['%s' % thread.exception for thread in missing])
583 except KeyboardInterrupt:
584 sendlog.info('- - - wait for threads to finish')
585 for thread in activethreads():
593 content_type=content_type,
594 if_etag_match=if_etag_match,
595 if_etag_not_match='*' if if_not_exist else None,
603 # download_* auxiliary methods
604 def _get_remote_blocks_info(self, obj, **restargs):
605 #retrieve object hashmap
606 myrange = restargs.pop('data_range', None)
607 hashmap = self.get_object_hashmap(obj, **restargs)
608 restargs['data_range'] = myrange
609 blocksize = int(hashmap['block_size'])
610 blockhash = hashmap['block_hash']
611 total_size = hashmap['bytes']
612 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
614 for i, h in enumerate(hashmap['hashes']):
615 # map_dict[h] = i CHAGE
617 map_dict[h].append(i)
620 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
622 def _dump_blocks_sync(
623 self, obj, remote_hashes, blocksize, total_size, dst, range,
625 for blockid, blockhash in enumerate(remote_hashes):
627 start = blocksize * blockid
628 is_last = start + blocksize > total_size
629 end = (total_size - 1) if is_last else (start + blocksize - 1)
630 (start, end) = _range_up(start, end, range)
631 args['data_range'] = 'bytes=%s-%s' % (start, end)
632 r = self.object_get(obj, success=(200, 206), **args)
637 def _get_block_async(self, obj, **args):
638 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
642 def _hash_from_file(self, fp, start, size, blockhash):
644 block = fp.read(size)
645 h = newhashlib(blockhash)
646 h.update(block.strip('\x00'))
647 return hexlify(h.digest())
649 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
650 """write the results of a greenleted rest call to a file
652 :param offset: the offset of the file up to blocksize
653 - e.g. if the range is 10-100, all blocks will be written to
656 for key, g in flying.items():
661 block = g.value.content
662 for block_start in blockids[key]:
663 local_file.seek(block_start + offset)
664 local_file.write(block)
670 def _dump_blocks_async(
671 self, obj, remote_hashes, blocksize, total_size, local_file,
672 blockhash=None, resume=False, filerange=None, **restargs):
673 file_size = fstat(local_file.fileno()).st_size if resume else 0
675 blockid_dict = dict()
677 if filerange is not None:
678 rstart = int(filerange.split('-')[0])
679 offset = rstart if blocksize > rstart else rstart % blocksize
681 self._init_thread_limit()
682 for block_hash, blockids in remote_hashes.items():
683 blockids = [blk * blocksize for blk in blockids]
684 unsaved = [blk for blk in blockids if not (
685 blk < file_size and block_hash == self._hash_from_file(
686 local_file, blk, blocksize, blockhash))]
687 self._cb_next(len(blockids) - len(unsaved))
690 self._watch_thread_limit(flying.values())
692 flying, blockid_dict, local_file, offset,
694 end = total_size - 1 if (
695 key + blocksize > total_size) else key + blocksize - 1
696 start, end = _range_up(key, end, filerange)
700 restargs['async_headers'] = {
701 'Range': 'bytes=%s-%s' % (start, end)}
702 flying[key] = self._get_block_async(obj, **restargs)
703 blockid_dict[key] = unsaved
705 for thread in flying.values():
707 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
717 if_modified_since=None,
718 if_unmodified_since=None):
719 """Download an object (multiple connections, random blocks)
721 :param obj: (str) remote object path
723 :param dst: open file descriptor (wb+)
725 :param download_cb: optional progress.bar object for downloading
727 :param version: (str) file version
729 :param resume: (bool) if set, preserve already downloaded file parts
731 :param range_str: (str) from, to are file positions (int) in bytes
733 :param if_match: (str)
735 :param if_none_match: (str)
737 :param if_modified_since: (str) formated date
739 :param if_unmodified_since: (str) formated date"""
742 data_range=None if range_str is None else 'bytes=%s' % range_str,
744 if_none_match=if_none_match,
745 if_modified_since=if_modified_since,
746 if_unmodified_since=if_unmodified_since)
753 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
754 assert total_size >= 0
757 self.progress_bar_gen = download_cb(len(hash_list))
761 self._dump_blocks_sync(
770 self._dump_blocks_async(
781 dst.truncate(total_size)
785 def download_to_string(
792 if_modified_since=None,
793 if_unmodified_since=None):
794 """Download an object to a string (multiple connections). This method
795 uses threads for http requests, but stores all content in memory.
797 :param obj: (str) remote object path
799 :param download_cb: optional progress.bar object for downloading
801 :param version: (str) file version
803 :param range_str: (str) from, to are file positions (int) in bytes
805 :param if_match: (str)
807 :param if_none_match: (str)
809 :param if_modified_since: (str) formated date
811 :param if_unmodified_since: (str) formated date
813 :returns: (str) the whole object contents
817 data_range=None if range_str is None else 'bytes=%s' % range_str,
819 if_none_match=if_none_match,
820 if_modified_since=if_modified_since,
821 if_unmodified_since=if_unmodified_since)
828 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
829 assert total_size >= 0
832 self.progress_bar_gen = download_cb(len(hash_list))
835 num_of_blocks = len(remote_hashes)
836 ret = [''] * num_of_blocks
837 self._init_thread_limit()
840 for blockid, blockhash in enumerate(remote_hashes):
841 start = blocksize * blockid
842 is_last = start + blocksize > total_size
843 end = (total_size - 1) if is_last else (start + blocksize - 1)
844 (start, end) = _range_up(start, end, range_str)
846 self._watch_thread_limit(flying.values())
847 flying[blockid] = self._get_block_async(obj, **restargs)
848 for runid, thread in flying.items():
849 if (blockid + 1) == num_of_blocks:
851 elif thread.isAlive():
854 raise thread.exception
855 ret[runid] = thread.value.content
859 except KeyboardInterrupt:
860 sendlog.info('- - - wait for threads to finish')
861 for thread in activethreads():
864 #Command Progress Bar method
865 def _cb_next(self, step=1):
866 if hasattr(self, 'progress_bar_gen'):
868 for i in xrange(step):
869 self.progress_bar_gen.next()
873 def _complete_cb(self):
876 self.progress_bar_gen.next()
880 def get_object_hashmap(
885 if_modified_since=None,
886 if_unmodified_since=None,
889 :param obj: (str) remote object path
891 :param if_match: (str)
893 :param if_none_match: (str)
895 :param if_modified_since: (str) formated date
897 :param if_unmodified_since: (str) formated date
899 :param data_range: (str) from-to where from and to are integers
900 denoting file positions in bytes
909 if_etag_match=if_match,
910 if_etag_not_match=if_none_match,
911 if_modified_since=if_modified_since,
912 if_unmodified_since=if_unmodified_since,
913 data_range=data_range)
914 except ClientError as err:
915 if err.status == 304 or err.status == 412:
920 def set_account_group(self, group, usernames):
924 :param usernames: (list)
926 r = self.account_post(update=True, groups={group: usernames})
929 def del_account_group(self, group):
933 self.account_post(update=True, groups={group: []})
935 def get_account_info(self, until=None):
937 :param until: (str) formated date
941 r = self.account_head(until=until)
942 if r.status_code == 401:
943 raise ClientError("No authorization", status=401)
946 def get_account_quota(self):
951 self.get_account_info(),
952 'X-Account-Policy-Quota',
955 #def get_account_versioning(self):
960 # self.get_account_info(),
961 # 'X-Account-Policy-Versioning',
964 def get_account_meta(self, until=None):
966 :param until: (str) formated date
970 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
972 def get_account_group(self):
976 return filter_in(self.get_account_info(), 'X-Account-Group-')
978 def set_account_meta(self, metapairs):
980 :param metapairs: (dict) {key1:val1, key2:val2, ...}
982 assert(type(metapairs) is dict)
983 r = self.account_post(update=True, metadata=metapairs)
986 def del_account_meta(self, metakey):
988 :param metakey: (str) metadatum key
990 r = self.account_post(update=True, metadata={metakey: ''})
993 #def set_account_quota(self, quota):
995 # :param quota: (int)
997 # self.account_post(update=True, quota=quota)
999 #def set_account_versioning(self, versioning):
1001 # :param versioning: (str)
1003 # r = self.account_post(update=True, versioning=versioning)
1006 def list_containers(self):
1010 r = self.account_get()
1013 def del_container(self, until=None, delimiter=None):
1015 :param until: (str) formated date
1017 :param delimiter: (str) with / empty container
1019 :raises ClientError: 404 Container does not exist
1021 :raises ClientError: 409 Container is not empty
1023 self._assert_container()
1024 r = self.container_delete(
1026 delimiter=delimiter,
1027 success=(204, 404, 409))
1028 if r.status_code == 404:
1030 'Container "%s" does not exist' % self.container,
1032 elif r.status_code == 409:
1034 'Container "%s" is not empty' % self.container,
1038 def get_container_versioning(self, container=None):
1040 :param container: (str)
1044 cnt_back_up = self.container
1046 self.container = container or cnt_back_up
1048 self.get_container_info(),
1049 'X-Container-Policy-Versioning')
1051 self.container = cnt_back_up
1053 def get_container_limit(self, container=None):
1055 :param container: (str)
1059 cnt_back_up = self.container
1061 self.container = container or cnt_back_up
1063 self.get_container_info(),
1064 'X-Container-Policy-Quota')
1066 self.container = cnt_back_up
1068 def get_container_info(self, until=None):
1070 :param until: (str) formated date
1074 :raises ClientError: 404 Container not found
1077 r = self.container_head(until=until)
1078 except ClientError as err:
1079 err.details.append('for container %s' % self.container)
1083 def get_container_meta(self, until=None):
1085 :param until: (str) formated date
1090 self.get_container_info(until=until),
1093 def get_container_object_meta(self, until=None):
1095 :param until: (str) formated date
1100 self.get_container_info(until=until),
1101 'X-Container-Object-Meta')
1103 def set_container_meta(self, metapairs):
1105 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1107 assert(type(metapairs) is dict)
1108 r = self.container_post(update=True, metadata=metapairs)
1111 def del_container_meta(self, metakey):
1113 :param metakey: (str) metadatum key
1115 :returns: (dict) response headers
1117 r = self.container_post(update=True, metadata={metakey: ''})
1120 def set_container_limit(self, limit):
1124 r = self.container_post(update=True, quota=limit)
1127 def set_container_versioning(self, versioning):
1129 :param versioning: (str)
1131 r = self.container_post(update=True, versioning=versioning)
1134 def del_object(self, obj, until=None, delimiter=None):
1136 :param obj: (str) remote object path
1138 :param until: (str) formated date
1140 :param delimiter: (str)
1142 self._assert_container()
1143 r = self.object_delete(obj, until=until, delimiter=delimiter)
1146 def set_object_meta(self, obj, metapairs):
1148 :param obj: (str) remote object path
1150 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1152 assert(type(metapairs) is dict)
1153 r = self.object_post(obj, update=True, metadata=metapairs)
1156 def del_object_meta(self, obj, metakey):
1158 :param obj: (str) remote object path
1160 :param metakey: (str) metadatum key
1162 r = self.object_post(obj, update=True, metadata={metakey: ''})
1165 def publish_object(self, obj):
1167 :param obj: (str) remote object path
1169 :returns: (str) access url
1171 self.object_post(obj, update=True, public=True)
1172 info = self.get_object_info(obj)
1173 pref, sep, rest = self.base_url.partition('//')
1174 base = rest.split('/')[0]
1175 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1177 def unpublish_object(self, obj):
1179 :param obj: (str) remote object path
1181 r = self.object_post(obj, update=True, public=False)
1184 def get_object_info(self, obj, version=None):
1186 :param obj: (str) remote object path
1188 :param version: (str)
1193 r = self.object_head(obj, version=version)
1195 except ClientError as ce:
1196 if ce.status == 404:
1197 raise ClientError('Object %s not found' % obj, status=404)
1200 def get_object_meta(self, obj, version=None):
1202 :param obj: (str) remote object path
1204 :param version: (str)
1209 self.get_object_info(obj, version=version),
1212 def get_object_sharing(self, obj):
1214 :param obj: (str) remote object path
1219 self.get_object_info(obj),
1224 perms = r['x-object-sharing'].split(';')
1229 raise ClientError('Incorrect reply format')
1230 (key, val) = perm.strip().split('=')
1234 def set_object_sharing(
1236 read_permission=False, write_permission=False):
1237 """Give read/write permisions to an object.
1239 :param obj: (str) remote object path
1241 :param read_permission: (list - bool) users and user groups that get
1242 read permission for this object - False means all previous read
1243 permissions will be removed
1245 :param write_permission: (list - bool) of users and user groups to get
1246 write permission for this object - False means all previous write
1247 permissions will be removed
1249 :returns: (dict) response headers
1252 perms = dict(read=read_permission or '', write=write_permission or '')
1253 r = self.object_post(obj, update=True, permissions=perms)
1256 def del_object_sharing(self, obj):
1258 :param obj: (str) remote object path
1260 return self.set_object_sharing(obj)
1262 def append_object(self, obj, source_file, upload_cb=None):
1264 :param obj: (str) remote object path
1266 :param source_file: open file descriptor
1268 :param upload_db: progress.bar for uploading
1270 self._assert_container()
1271 meta = self.get_container_info()
1272 blocksize = int(meta['x-container-block-size'])
1273 filesize = fstat(source_file.fileno()).st_size
1274 nblocks = 1 + (filesize - 1) // blocksize
1278 self.progress_bar_gen = upload_cb(nblocks)
1281 self._init_thread_limit()
1283 for i in range(nblocks):
1284 block = source_file.read(min(blocksize, filesize - offset))
1285 offset += len(block)
1287 self._watch_thread_limit(flying.values())
1289 flying[i] = SilentEvent(
1290 method=self.object_post,
1293 content_range='bytes */*',
1294 content_type='application/octet-stream',
1295 content_length=len(block),
1299 for key, thread in flying.items():
1300 if thread.isAlive():
1302 unfinished[key] = thread
1305 if thread.exception:
1306 raise thread.exception
1307 headers[key] = thread.value.headers
1310 except KeyboardInterrupt:
1311 sendlog.info('- - - wait for threads to finish')
1312 for thread in activethreads():
1315 from time import sleep
1316 sleep(2 * len(activethreads()))
1317 return headers.values()
1319 def truncate_object(self, obj, upto_bytes):
1321 :param obj: (str) remote object path
1323 :param upto_bytes: max number of bytes to leave on file
1325 :returns: (dict) response headers
1327 r = self.object_post(
1330 content_range='bytes 0-%s/*' % upto_bytes,
1331 content_type='application/octet-stream',
1332 object_bytes=upto_bytes,
1333 source_object=path4url(self.container, obj))
1336 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1337 """Overwrite a part of an object from local source file
1339 :param obj: (str) remote object path
1341 :param start: (int) position in bytes to start overwriting from
1343 :param end: (int) position in bytes to stop overwriting at
1345 :param source_file: open file descriptor
1347 :param upload_db: progress.bar for uploading
1350 r = self.get_object_info(obj)
1351 rf_size = int(r['content-length'])
1352 if rf_size < int(start):
1354 'Range start exceeds file size',
1356 elif rf_size < int(end):
1358 'Range end exceeds file size',
1360 self._assert_container()
1361 meta = self.get_container_info()
1362 blocksize = int(meta['x-container-block-size'])
1363 filesize = fstat(source_file.fileno()).st_size
1364 datasize = int(end) - int(start) + 1
1365 nblocks = 1 + (datasize - 1) // blocksize
1368 self.progress_bar_gen = upload_cb(nblocks)
1371 for i in range(nblocks):
1372 read_size = min(blocksize, filesize - offset, datasize - offset)
1373 block = source_file.read(read_size)
1374 r = self.object_post(
1377 content_type='application/octet-stream',
1378 content_length=len(block),
1379 content_range='bytes %s-%s/*' % (
1381 start + offset + len(block) - 1),
1383 headers.append(dict(r.headers))
1384 offset += len(block)
1390 self, src_container, src_object, dst_container,
1392 source_version=None,
1393 source_account=None,
1398 :param src_container: (str) source container
1400 :param src_object: (str) source object path
1402 :param dst_container: (str) destination container
1404 :param dst_object: (str) destination object path
1406 :param source_version: (str) source object version
1408 :param source_account: (str) account to copy from
1410 :param public: (bool)
1412 :param content_type: (str)
1414 :param delimiter: (str)
1416 :returns: (dict) response headers
1418 self._assert_account()
1419 self.container = dst_container
1420 src_path = path4url(src_container, src_object)
1421 r = self.object_put(
1422 dst_object or src_object,
1426 source_version=source_version,
1427 source_account=source_account,
1429 content_type=content_type,
1430 delimiter=delimiter)
1434 self, src_container, src_object, dst_container,
1436 source_account=None,
1437 source_version=None,
1442 :param src_container: (str) source container
1444 :param src_object: (str) source object path
1446 :param dst_container: (str) destination container
1448 :param dst_object: (str) destination object path
1450 :param source_account: (str) account to move from
1452 :param source_version: (str) source object version
1454 :param public: (bool)
1456 :param content_type: (str)
1458 :param delimiter: (str)
1460 :returns: (dict) response headers
1462 self._assert_account()
1463 self.container = dst_container
1464 dst_object = dst_object or src_object
1465 src_path = path4url(src_container, src_object)
1466 r = self.object_put(
1471 source_account=source_account,
1472 source_version=source_version,
1474 content_type=content_type,
1475 delimiter=delimiter)
1478 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1479 """Get accounts that share with self.account
1483 :param marker: (str)
1487 self._assert_account()
1489 self.set_param('format', 'json')
1490 self.set_param('limit', limit, iff=limit is not None)
1491 self.set_param('marker', marker, iff=marker is not None)
1494 success = kwargs.pop('success', (200, 204))
1495 r = self.get(path, *args, success=success, **kwargs)
1498 def get_object_versionlist(self, obj):
1500 :param obj: (str) remote object path
1504 self._assert_container()
1505 r = self.object_get(obj, format='json', version='list')
1506 return r.json['versions']