1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
76 container=None, sizelimit=None, versioning=None, metadata=None):
78 :param container: (str) if not given, self.container is used instead
80 :param sizelimit: (int) container total size limit in bytes
82 :param versioning: (str) can be auto or whatever supported by server
84 :param metadata: (dict) Custom user-defined metadata of the form
85 { 'name1': 'value1', 'name2': 'value2', ... }
87 :returns: (dict) response headers
89 cnt_back_up = self.container
91 self.container = container or cnt_back_up
92 r = self.container_put(
93 quota=sizelimit, versioning=versioning, metadata=metadata)
96 self.container = cnt_back_up
98 def purge_container(self, container=None):
99 """Delete an empty container and destroy associated blocks
101 cnt_back_up = self.container
103 self.container = container or cnt_back_up
104 r = self.container_delete(until=unicode(time()))
106 self.container = cnt_back_up
109 def upload_object_unchunked(
114 content_encoding=None,
115 content_disposition=None,
120 :param obj: (str) remote object path
122 :param f: open file descriptor
124 :param withHashFile: (bool)
126 :param size: (int) size of data to upload
130 :param content_encoding: (str)
132 :param content_disposition: (str)
134 :param content_type: (str)
136 :param sharing: {'read':[user and/or grp names],
137 'write':[usr and/or grp names]}
139 :param public: (bool)
141 :returns: (dict) created object metadata
143 self._assert_container()
149 data = json.dumps(json.loads(data))
151 raise ClientError('"%s" is not json-formated' % f.name, 1)
153 msg = '"%s" is not a valid hashmap file' % f.name
154 raise ClientError(msg, 1)
157 data = f.read(size) if size else f.read()
162 content_encoding=content_encoding,
163 content_disposition=content_disposition,
164 content_type=content_type,
170 def create_object_by_manifestation(
173 content_encoding=None,
174 content_disposition=None,
179 :param obj: (str) remote object path
183 :param content_encoding: (str)
185 :param content_disposition: (str)
187 :param content_type: (str)
189 :param sharing: {'read':[user and/or grp names],
190 'write':[usr and/or grp names]}
192 :param public: (bool)
194 :returns: (dict) created object metadata
196 self._assert_container()
201 content_encoding=content_encoding,
202 content_disposition=content_disposition,
203 content_type=content_type,
206 manifest='%s/%s' % (self.container, obj))
209 # upload_* auxiliary methods
210 def _put_block_async(self, data, hash):
211 event = SilentEvent(method=self._put_block, data=data, hash=hash)
215 def _put_block(self, data, hash):
216 r = self.container_post(
218 content_type='application/octet-stream',
219 content_length=len(data),
222 assert r.json[0] == hash, 'Local hash does not match server'
224 def _get_file_block_info(self, fileobj, size=None, cache=None):
226 :param fileobj: (file descriptor) source
228 :param size: (int) size of data to upload from source
230 :param cache: (dict) if provided, cache container info response to
231 avoid redundant calls
233 if isinstance(cache, dict):
235 meta = cache[self.container]
237 meta = self.get_container_info()
238 cache[self.container] = meta
240 meta = self.get_container_info()
241 blocksize = int(meta['x-container-block-size'])
242 blockhash = meta['x-container-block-hash']
243 size = size if size is not None else fstat(fileobj.fileno()).st_size
244 nblocks = 1 + (size - 1) // blocksize
245 return (blocksize, blockhash, size, nblocks)
247 def _create_object_or_get_missing_hashes(
254 if_etag_not_match=None,
255 content_encoding=None,
256 content_disposition=None,
264 content_type=content_type,
266 if_etag_match=if_etag_match,
267 if_etag_not_match=if_etag_not_match,
268 content_encoding=content_encoding,
269 content_disposition=content_disposition,
270 permissions=permissions,
273 return (None if r.status_code == 201 else r.json), r.headers
275 def _calculate_blocks_for_upload(
276 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
280 hash_gen = hash_cb(nblocks)
283 for i in range(nblocks):
284 block = fileobj.read(min(blocksize, size - offset))
286 hash = _pithos_hash(block, blockhash)
288 hmap[hash] = (offset, bytes)
292 msg = 'Failed to calculate uploaded blocks:'
293 ' Offset and object size do not match'
294 assert offset == size, msg
296 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297 """upload missing blocks asynchronously"""
299 self._init_thread_limit()
304 offset, bytes = hmap[hash]
306 data = fileobj.read(bytes)
307 r = self._put_block_async(data, hash)
309 unfinished = self._watch_thread_limit(flying)
310 for thread in set(flying).difference(unfinished):
312 failures.append(thread)
315 ClientError) and thread.exception.status == 502:
316 self.POOLSIZE = self._thread_limit
317 elif thread.isAlive():
318 flying.append(thread)
326 for thread in flying:
329 failures.append(thread)
336 return [failure.kwargs['hash'] for failure in failures]
346 content_encoding=None,
347 content_disposition=None,
351 container_info_cache=None):
352 """Upload an object using multiple connections (threads)
354 :param obj: (str) remote object path
356 :param f: open file descriptor (rb)
358 :param hash_cb: optional progress.bar object for calculating hashes
360 :param upload_cb: optional progress.bar object for uploading
364 :param if_etag_match: (str) Push that value to if-match header at file
367 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368 it does not exist remotely, otherwise the operation will fail.
369 Involves the case of an object with the same path is created while
370 the object is being uploaded.
372 :param content_encoding: (str)
374 :param content_disposition: (str)
376 :param content_type: (str)
378 :param sharing: {'read':[user and/or grp names],
379 'write':[usr and/or grp names]}
381 :param public: (bool)
383 :param container_info_cache: (dict) if given, avoid redundant calls to
384 server for container info (block size and hash information)
386 self._assert_container()
389 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390 f, size, container_info_cache)
391 (hashes, hmap, offset) = ([], {}, 0)
393 content_type = 'application/octet-stream'
395 self._calculate_blocks_for_upload(
402 hashmap = dict(bytes=size, hashes=hashes)
403 missing, obj_headers = self._create_object_or_get_missing_hashes(
405 content_type=content_type,
407 if_etag_match=if_etag_match,
408 if_etag_not_match='*' if if_not_exist else None,
409 content_encoding=content_encoding,
410 content_disposition=content_disposition,
418 upload_gen = upload_cb(len(missing))
419 for i in range(len(missing), len(hashmap['hashes']) + 1):
430 sendlog.info('%s blocks missing' % len(missing))
431 num_of_blocks = len(missing)
432 missing = self._upload_missing_blocks(
438 if num_of_blocks == len(missing):
441 num_of_blocks = len(missing)
446 details = ['%s' % thread.exception for thread in missing]
450 '%s blocks failed to upload' % len(missing),
452 except KeyboardInterrupt:
453 sendlog.info('- - - wait for threads to finish')
454 for thread in activethreads():
462 content_type=content_type,
463 content_encoding=content_encoding,
464 if_etag_match=if_etag_match,
465 if_etag_not_match='*' if if_not_exist else None,
473 def upload_from_string(
474 self, obj, input_str,
480 content_encoding=None,
481 content_disposition=None,
485 container_info_cache=None):
486 """Upload an object using multiple connections (threads)
488 :param obj: (str) remote object path
490 :param input_str: (str) upload content
492 :param hash_cb: optional progress.bar object for calculating hashes
494 :param upload_cb: optional progress.bar object for uploading
498 :param if_etag_match: (str) Push that value to if-match header at file
501 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
502 it does not exist remotely, otherwise the operation will fail.
503 Involves the case of an object with the same path is created while
504 the object is being uploaded.
506 :param content_encoding: (str)
508 :param content_disposition: (str)
510 :param content_type: (str)
512 :param sharing: {'read':[user and/or grp names],
513 'write':[usr and/or grp names]}
515 :param public: (bool)
517 :param container_info_cache: (dict) if given, avoid redundant calls to
518 server for container info (block size and hash information)
520 self._assert_container()
522 blocksize, blockhash, size, nblocks = self._get_file_block_info(
523 fileobj=None, size=len(input_str), cache=container_info_cache)
524 (hashes, hmap, offset) = ([], {}, 0)
526 content_type = 'application/octet-stream'
530 for blockid in range(nblocks):
531 start = blockid * blocksize
532 block = input_str[start: (start + blocksize)]
533 hashes.append(_pithos_hash(block, blockhash))
534 hmap[hashes[blockid]] = (start, block)
536 hashmap = dict(bytes=size, hashes=hashes)
537 missing, obj_headers = self._create_object_or_get_missing_hashes(
539 content_type=content_type,
541 if_etag_match=if_etag_match,
542 if_etag_not_match='*' if if_not_exist else None,
543 content_encoding=content_encoding,
544 content_disposition=content_disposition,
549 num_of_missing = len(missing)
552 self.progress_bar_gen = upload_cb(nblocks)
553 for i in range(nblocks + 1 - num_of_missing):
559 while tries and missing:
563 offset, block = hmap[hash]
564 bird = self._put_block_async(block, hash)
566 unfinished = self._watch_thread_limit(flying)
567 for thread in set(flying).difference(unfinished):
569 failures.append(thread.kwargs['hash'])
571 flying.append(thread)
575 for thread in flying:
578 failures.append(thread.kwargs['hash'])
581 if missing and len(missing) == old_failures:
583 old_failures = len(missing)
586 '%s blocks failed to upload' % len(missing),
587 details=['%s' % thread.exception for thread in missing])
588 except KeyboardInterrupt:
589 sendlog.info('- - - wait for threads to finish')
590 for thread in activethreads():
598 content_type=content_type,
599 if_etag_match=if_etag_match,
600 if_etag_not_match='*' if if_not_exist else None,
608 # download_* auxiliary methods
609 def _get_remote_blocks_info(self, obj, **restargs):
610 #retrieve object hashmap
611 myrange = restargs.pop('data_range', None)
612 hashmap = self.get_object_hashmap(obj, **restargs)
613 restargs['data_range'] = myrange
614 blocksize = int(hashmap['block_size'])
615 blockhash = hashmap['block_hash']
616 total_size = hashmap['bytes']
617 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
619 for i, h in enumerate(hashmap['hashes']):
620 # map_dict[h] = i CHAGE
622 map_dict[h].append(i)
625 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
627 def _dump_blocks_sync(
628 self, obj, remote_hashes, blocksize, total_size, dst, range,
630 for blockid, blockhash in enumerate(remote_hashes):
632 start = blocksize * blockid
633 is_last = start + blocksize > total_size
634 end = (total_size - 1) if is_last else (start + blocksize - 1)
635 (start, end) = _range_up(start, end, range)
636 args['data_range'] = 'bytes=%s-%s' % (start, end)
637 r = self.object_get(obj, success=(200, 206), **args)
642 def _get_block_async(self, obj, **args):
643 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
647 def _hash_from_file(self, fp, start, size, blockhash):
649 block = fp.read(size)
650 h = newhashlib(blockhash)
651 h.update(block.strip('\x00'))
652 return hexlify(h.digest())
654 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
655 """write the results of a greenleted rest call to a file
657 :param offset: the offset of the file up to blocksize
658 - e.g. if the range is 10-100, all blocks will be written to
661 for key, g in flying.items():
666 block = g.value.content
667 for block_start in blockids[key]:
668 local_file.seek(block_start + offset)
669 local_file.write(block)
675 def _dump_blocks_async(
676 self, obj, remote_hashes, blocksize, total_size, local_file,
677 blockhash=None, resume=False, filerange=None, **restargs):
678 file_size = fstat(local_file.fileno()).st_size if resume else 0
680 blockid_dict = dict()
682 if filerange is not None:
683 rstart = int(filerange.split('-')[0])
684 offset = rstart if blocksize > rstart else rstart % blocksize
686 self._init_thread_limit()
687 for block_hash, blockids in remote_hashes.items():
688 blockids = [blk * blocksize for blk in blockids]
689 unsaved = [blk for blk in blockids if not (
690 blk < file_size and block_hash == self._hash_from_file(
691 local_file, blk, blocksize, blockhash))]
692 self._cb_next(len(blockids) - len(unsaved))
695 self._watch_thread_limit(flying.values())
697 flying, blockid_dict, local_file, offset,
699 end = total_size - 1 if (
700 key + blocksize > total_size) else key + blocksize - 1
701 start, end = _range_up(key, end, filerange)
705 restargs['async_headers'] = {
706 'Range': 'bytes=%s-%s' % (start, end)}
707 flying[key] = self._get_block_async(obj, **restargs)
708 blockid_dict[key] = unsaved
710 for thread in flying.values():
712 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
722 if_modified_since=None,
723 if_unmodified_since=None):
724 """Download an object (multiple connections, random blocks)
726 :param obj: (str) remote object path
728 :param dst: open file descriptor (wb+)
730 :param download_cb: optional progress.bar object for downloading
732 :param version: (str) file version
734 :param resume: (bool) if set, preserve already downloaded file parts
736 :param range_str: (str) from, to are file positions (int) in bytes
738 :param if_match: (str)
740 :param if_none_match: (str)
742 :param if_modified_since: (str) formated date
744 :param if_unmodified_since: (str) formated date"""
747 data_range=None if range_str is None else 'bytes=%s' % range_str,
749 if_none_match=if_none_match,
750 if_modified_since=if_modified_since,
751 if_unmodified_since=if_unmodified_since)
758 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
759 assert total_size >= 0
762 self.progress_bar_gen = download_cb(len(hash_list))
766 self._dump_blocks_sync(
775 self._dump_blocks_async(
786 dst.truncate(total_size)
790 def download_to_string(
797 if_modified_since=None,
798 if_unmodified_since=None):
799 """Download an object to a string (multiple connections). This method
800 uses threads for http requests, but stores all content in memory.
802 :param obj: (str) remote object path
804 :param download_cb: optional progress.bar object for downloading
806 :param version: (str) file version
808 :param range_str: (str) from, to are file positions (int) in bytes
810 :param if_match: (str)
812 :param if_none_match: (str)
814 :param if_modified_since: (str) formated date
816 :param if_unmodified_since: (str) formated date
818 :returns: (str) the whole object contents
822 data_range=None if range_str is None else 'bytes=%s' % range_str,
824 if_none_match=if_none_match,
825 if_modified_since=if_modified_since,
826 if_unmodified_since=if_unmodified_since)
833 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
834 assert total_size >= 0
837 self.progress_bar_gen = download_cb(len(hash_list))
840 num_of_blocks = len(remote_hashes)
841 ret = [''] * num_of_blocks
842 self._init_thread_limit()
845 for blockid, blockhash in enumerate(remote_hashes):
846 start = blocksize * blockid
847 is_last = start + blocksize > total_size
848 end = (total_size - 1) if is_last else (start + blocksize - 1)
849 (start, end) = _range_up(start, end, range_str)
851 self._watch_thread_limit(flying.values())
852 flying[blockid] = self._get_block_async(obj, **restargs)
853 for runid, thread in flying.items():
854 if (blockid + 1) == num_of_blocks:
856 elif thread.isAlive():
859 raise thread.exception
860 ret[runid] = thread.value.content
864 except KeyboardInterrupt:
865 sendlog.info('- - - wait for threads to finish')
866 for thread in activethreads():
869 #Command Progress Bar method
870 def _cb_next(self, step=1):
871 if hasattr(self, 'progress_bar_gen'):
873 for i in xrange(step):
874 self.progress_bar_gen.next()
878 def _complete_cb(self):
881 self.progress_bar_gen.next()
885 def get_object_hashmap(
890 if_modified_since=None,
891 if_unmodified_since=None,
894 :param obj: (str) remote object path
896 :param if_match: (str)
898 :param if_none_match: (str)
900 :param if_modified_since: (str) formated date
902 :param if_unmodified_since: (str) formated date
904 :param data_range: (str) from-to where from and to are integers
905 denoting file positions in bytes
914 if_etag_match=if_match,
915 if_etag_not_match=if_none_match,
916 if_modified_since=if_modified_since,
917 if_unmodified_since=if_unmodified_since,
918 data_range=data_range)
919 except ClientError as err:
920 if err.status == 304 or err.status == 412:
925 def set_account_group(self, group, usernames):
929 :param usernames: (list)
931 r = self.account_post(update=True, groups={group: usernames})
934 def del_account_group(self, group):
938 self.account_post(update=True, groups={group: []})
940 def get_account_info(self, until=None):
942 :param until: (str) formated date
946 r = self.account_head(until=until)
947 if r.status_code == 401:
948 raise ClientError("No authorization", status=401)
951 def get_account_quota(self):
956 self.get_account_info(),
957 'X-Account-Policy-Quota',
960 #def get_account_versioning(self):
965 # self.get_account_info(),
966 # 'X-Account-Policy-Versioning',
969 def get_account_meta(self, until=None):
971 :param until: (str) formated date
975 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
977 def get_account_group(self):
981 return filter_in(self.get_account_info(), 'X-Account-Group-')
983 def set_account_meta(self, metapairs):
985 :param metapairs: (dict) {key1:val1, key2:val2, ...}
987 assert(type(metapairs) is dict)
988 r = self.account_post(update=True, metadata=metapairs)
991 def del_account_meta(self, metakey):
993 :param metakey: (str) metadatum key
995 r = self.account_post(update=True, metadata={metakey: ''})
998 #def set_account_quota(self, quota):
1000 # :param quota: (int)
1002 # self.account_post(update=True, quota=quota)
1004 #def set_account_versioning(self, versioning):
1006 # :param versioning: (str)
1008 # r = self.account_post(update=True, versioning=versioning)
1011 def list_containers(self):
1015 r = self.account_get()
1018 def del_container(self, until=None, delimiter=None):
1020 :param until: (str) formated date
1022 :param delimiter: (str) with / empty container
1024 :raises ClientError: 404 Container does not exist
1026 :raises ClientError: 409 Container is not empty
1028 self._assert_container()
1029 r = self.container_delete(
1031 delimiter=delimiter,
1032 success=(204, 404, 409))
1033 if r.status_code == 404:
1035 'Container "%s" does not exist' % self.container,
1037 elif r.status_code == 409:
1039 'Container "%s" is not empty' % self.container,
1043 def get_container_versioning(self, container=None):
1045 :param container: (str)
1049 cnt_back_up = self.container
1051 self.container = container or cnt_back_up
1053 self.get_container_info(),
1054 'X-Container-Policy-Versioning')
1056 self.container = cnt_back_up
1058 def get_container_limit(self, container=None):
1060 :param container: (str)
1064 cnt_back_up = self.container
1066 self.container = container or cnt_back_up
1068 self.get_container_info(),
1069 'X-Container-Policy-Quota')
1071 self.container = cnt_back_up
1073 def get_container_info(self, until=None):
1075 :param until: (str) formated date
1079 :raises ClientError: 404 Container not found
1082 r = self.container_head(until=until)
1083 except ClientError as err:
1084 err.details.append('for container %s' % self.container)
1088 def get_container_meta(self, until=None):
1090 :param until: (str) formated date
1095 self.get_container_info(until=until),
1098 def get_container_object_meta(self, until=None):
1100 :param until: (str) formated date
1105 self.get_container_info(until=until),
1106 'X-Container-Object-Meta')
1108 def set_container_meta(self, metapairs):
1110 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1112 assert(type(metapairs) is dict)
1113 r = self.container_post(update=True, metadata=metapairs)
1116 def del_container_meta(self, metakey):
1118 :param metakey: (str) metadatum key
1120 :returns: (dict) response headers
1122 r = self.container_post(update=True, metadata={metakey: ''})
1125 def set_container_limit(self, limit):
1129 r = self.container_post(update=True, quota=limit)
1132 def set_container_versioning(self, versioning):
1134 :param versioning: (str)
1136 r = self.container_post(update=True, versioning=versioning)
1139 def del_object(self, obj, until=None, delimiter=None):
1141 :param obj: (str) remote object path
1143 :param until: (str) formated date
1145 :param delimiter: (str)
1147 self._assert_container()
1148 r = self.object_delete(obj, until=until, delimiter=delimiter)
1151 def set_object_meta(self, obj, metapairs):
1153 :param obj: (str) remote object path
1155 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1157 assert(type(metapairs) is dict)
1158 r = self.object_post(obj, update=True, metadata=metapairs)
1161 def del_object_meta(self, obj, metakey):
1163 :param obj: (str) remote object path
1165 :param metakey: (str) metadatum key
1167 r = self.object_post(obj, update=True, metadata={metakey: ''})
1170 def publish_object(self, obj):
1172 :param obj: (str) remote object path
1174 :returns: (str) access url
1176 self.object_post(obj, update=True, public=True)
1177 info = self.get_object_info(obj)
1178 return info['x-object-public']
1179 pref, sep, rest = self.base_url.partition('//')
1180 base = rest.split('/')[0]
1181 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1183 def unpublish_object(self, obj):
1185 :param obj: (str) remote object path
1187 r = self.object_post(obj, update=True, public=False)
1190 def get_object_info(self, obj, version=None):
1192 :param obj: (str) remote object path
1194 :param version: (str)
1199 r = self.object_head(obj, version=version)
1201 except ClientError as ce:
1202 if ce.status == 404:
1203 raise ClientError('Object %s not found' % obj, status=404)
1206 def get_object_meta(self, obj, version=None):
1208 :param obj: (str) remote object path
1210 :param version: (str)
1215 self.get_object_info(obj, version=version),
1218 def get_object_sharing(self, obj):
1220 :param obj: (str) remote object path
1225 self.get_object_info(obj),
1230 perms = r['x-object-sharing'].split(';')
1235 raise ClientError('Incorrect reply format')
1236 (key, val) = perm.strip().split('=')
1240 def set_object_sharing(
1242 read_permission=False, write_permission=False):
1243 """Give read/write permisions to an object.
1245 :param obj: (str) remote object path
1247 :param read_permission: (list - bool) users and user groups that get
1248 read permission for this object - False means all previous read
1249 permissions will be removed
1251 :param write_permission: (list - bool) of users and user groups to get
1252 write permission for this object - False means all previous write
1253 permissions will be removed
1255 :returns: (dict) response headers
1258 perms = dict(read=read_permission or '', write=write_permission or '')
1259 r = self.object_post(obj, update=True, permissions=perms)
1262 def del_object_sharing(self, obj):
1264 :param obj: (str) remote object path
1266 return self.set_object_sharing(obj)
1268 def append_object(self, obj, source_file, upload_cb=None):
1270 :param obj: (str) remote object path
1272 :param source_file: open file descriptor
1274 :param upload_db: progress.bar for uploading
1276 self._assert_container()
1277 meta = self.get_container_info()
1278 blocksize = int(meta['x-container-block-size'])
1279 filesize = fstat(source_file.fileno()).st_size
1280 nblocks = 1 + (filesize - 1) // blocksize
1284 self.progress_bar_gen = upload_cb(nblocks)
1287 self._init_thread_limit()
1289 for i in range(nblocks):
1290 block = source_file.read(min(blocksize, filesize - offset))
1291 offset += len(block)
1293 self._watch_thread_limit(flying.values())
1295 flying[i] = SilentEvent(
1296 method=self.object_post,
1299 content_range='bytes */*',
1300 content_type='application/octet-stream',
1301 content_length=len(block),
1305 for key, thread in flying.items():
1306 if thread.isAlive():
1308 unfinished[key] = thread
1311 if thread.exception:
1312 raise thread.exception
1313 headers[key] = thread.value.headers
1316 except KeyboardInterrupt:
1317 sendlog.info('- - - wait for threads to finish')
1318 for thread in activethreads():
1321 from time import sleep
1322 sleep(2 * len(activethreads()))
1323 return headers.values()
1325 def truncate_object(self, obj, upto_bytes):
1327 :param obj: (str) remote object path
1329 :param upto_bytes: max number of bytes to leave on file
1331 :returns: (dict) response headers
1333 r = self.object_post(
1336 content_range='bytes 0-%s/*' % upto_bytes,
1337 content_type='application/octet-stream',
1338 object_bytes=upto_bytes,
1339 source_object=path4url(self.container, obj))
1342 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1343 """Overwrite a part of an object from local source file
1345 :param obj: (str) remote object path
1347 :param start: (int) position in bytes to start overwriting from
1349 :param end: (int) position in bytes to stop overwriting at
1351 :param source_file: open file descriptor
1353 :param upload_db: progress.bar for uploading
1356 r = self.get_object_info(obj)
1357 rf_size = int(r['content-length'])
1358 if rf_size < int(start):
1360 'Range start exceeds file size',
1362 elif rf_size < int(end):
1364 'Range end exceeds file size',
1366 self._assert_container()
1367 meta = self.get_container_info()
1368 blocksize = int(meta['x-container-block-size'])
1369 filesize = fstat(source_file.fileno()).st_size
1370 datasize = int(end) - int(start) + 1
1371 nblocks = 1 + (datasize - 1) // blocksize
1374 self.progress_bar_gen = upload_cb(nblocks)
1377 for i in range(nblocks):
1378 read_size = min(blocksize, filesize - offset, datasize - offset)
1379 block = source_file.read(read_size)
1380 r = self.object_post(
1383 content_type='application/octet-stream',
1384 content_length=len(block),
1385 content_range='bytes %s-%s/*' % (
1387 start + offset + len(block) - 1),
1389 headers.append(dict(r.headers))
1390 offset += len(block)
1396 self, src_container, src_object, dst_container,
1398 source_version=None,
1399 source_account=None,
1404 :param src_container: (str) source container
1406 :param src_object: (str) source object path
1408 :param dst_container: (str) destination container
1410 :param dst_object: (str) destination object path
1412 :param source_version: (str) source object version
1414 :param source_account: (str) account to copy from
1416 :param public: (bool)
1418 :param content_type: (str)
1420 :param delimiter: (str)
1422 :returns: (dict) response headers
1424 self._assert_account()
1425 self.container = dst_container
1426 src_path = path4url(src_container, src_object)
1427 r = self.object_put(
1428 dst_object or src_object,
1432 source_version=source_version,
1433 source_account=source_account,
1435 content_type=content_type,
1436 delimiter=delimiter)
1440 self, src_container, src_object, dst_container,
1442 source_account=None,
1443 source_version=None,
1448 :param src_container: (str) source container
1450 :param src_object: (str) source object path
1452 :param dst_container: (str) destination container
1454 :param dst_object: (str) destination object path
1456 :param source_account: (str) account to move from
1458 :param source_version: (str) source object version
1460 :param public: (bool)
1462 :param content_type: (str)
1464 :param delimiter: (str)
1466 :returns: (dict) response headers
1468 self._assert_account()
1469 self.container = dst_container
1470 dst_object = dst_object or src_object
1471 src_path = path4url(src_container, src_object)
1472 r = self.object_put(
1477 source_account=source_account,
1478 source_version=source_version,
1480 content_type=content_type,
1481 delimiter=delimiter)
1484 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1485 """Get accounts that share with self.account
1489 :param marker: (str)
1493 self._assert_account()
1495 self.set_param('format', 'json')
1496 self.set_param('limit', limit, iff=limit is not None)
1497 self.set_param('marker', marker, iff=marker is not None)
1500 success = kwargs.pop('success', (200, 204))
1501 r = self.get(path, *args, success=success, **kwargs)
1504 def get_object_versionlist(self, obj):
1506 :param obj: (str) remote object path
1510 self._assert_container()
1511 r = self.object_get(obj, format='json', version='list')
1512 return r.json['versions']