1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
76 container=None, sizelimit=None, versioning=None, metadata=None):
78 :param container: (str) if not given, self.container is used instead
80 :param sizelimit: (int) container total size limit in bytes
82 :param versioning: (str) can be auto or whatever supported by server
84 :param metadata: (dict) Custom user-defined metadata of the form
85 { 'name1': 'value1', 'name2': 'value2', ... }
87 :returns: (dict) response headers
89 cnt_back_up = self.container
91 self.container = container or cnt_back_up
92 r = self.container_put(
93 quota=sizelimit, versioning=versioning, metadata=metadata)
96 self.container = cnt_back_up
98 def purge_container(self, container=None):
99 """Delete an empty container and destroy associated blocks
101 cnt_back_up = self.container
103 self.container = container or cnt_back_up
104 r = self.container_delete(until=unicode(time()))
106 self.container = cnt_back_up
109 def upload_object_unchunked(
114 content_encoding=None,
115 content_disposition=None,
120 :param obj: (str) remote object path
122 :param f: open file descriptor
124 :param withHashFile: (bool)
126 :param size: (int) size of data to upload
130 :param content_encoding: (str)
132 :param content_disposition: (str)
134 :param content_type: (str)
136 :param sharing: {'read':[user and/or grp names],
137 'write':[usr and/or grp names]}
139 :param public: (bool)
141 :returns: (dict) created object metadata
143 self._assert_container()
149 data = json.dumps(json.loads(data))
151 raise ClientError('"%s" is not json-formated' % f.name, 1)
153 msg = '"%s" is not a valid hashmap file' % f.name
154 raise ClientError(msg, 1)
157 data = f.read(size) if size else f.read()
162 content_encoding=content_encoding,
163 content_disposition=content_disposition,
164 content_type=content_type,
170 def create_object_by_manifestation(
173 content_encoding=None,
174 content_disposition=None,
179 :param obj: (str) remote object path
183 :param content_encoding: (str)
185 :param content_disposition: (str)
187 :param content_type: (str)
189 :param sharing: {'read':[user and/or grp names],
190 'write':[usr and/or grp names]}
192 :param public: (bool)
194 :returns: (dict) created object metadata
196 self._assert_container()
201 content_encoding=content_encoding,
202 content_disposition=content_disposition,
203 content_type=content_type,
206 manifest='%s/%s' % (self.container, obj))
209 # upload_* auxiliary methods
210 def _put_block_async(self, data, hash):
211 event = SilentEvent(method=self._put_block, data=data, hash=hash)
215 def _put_block(self, data, hash):
216 r = self.container_post(
218 content_type='application/octet-stream',
219 content_length=len(data),
222 assert r.json[0] == hash, 'Local hash does not match server'
224 def _get_file_block_info(self, fileobj, size=None, cache=None):
226 :param fileobj: (file descriptor) source
228 :param size: (int) size of data to upload from source
230 :param cache: (dict) if provided, cache container info response to
231 avoid redundant calls
233 if isinstance(cache, dict):
235 meta = cache[self.container]
237 meta = self.get_container_info()
238 cache[self.container] = meta
240 meta = self.get_container_info()
241 blocksize = int(meta['x-container-block-size'])
242 blockhash = meta['x-container-block-hash']
243 size = size if size is not None else fstat(fileobj.fileno()).st_size
244 nblocks = 1 + (size - 1) // blocksize
245 return (blocksize, blockhash, size, nblocks)
247 def _create_object_or_get_missing_hashes(
254 if_etag_not_match=None,
255 content_encoding=None,
256 content_disposition=None,
264 content_type=content_type,
266 if_etag_match=if_etag_match,
267 if_etag_not_match=if_etag_not_match,
268 content_encoding=content_encoding,
269 content_disposition=content_disposition,
270 permissions=permissions,
273 return (None if r.status_code == 201 else r.json), r.headers
275 def _calculate_blocks_for_upload(
276 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
280 hash_gen = hash_cb(nblocks)
283 for i in range(nblocks):
284 block = fileobj.read(min(blocksize, size - offset))
286 hash = _pithos_hash(block, blockhash)
288 hmap[hash] = (offset, bytes)
292 msg = 'Failed to calculate uploaded blocks:'
293 ' Offset and object size do not match'
294 assert offset == size, msg
296 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297 """upload missing blocks asynchronously"""
299 self._init_thread_limit()
304 offset, bytes = hmap[hash]
306 data = fileobj.read(bytes)
307 r = self._put_block_async(data, hash)
309 unfinished = self._watch_thread_limit(flying)
310 for thread in set(flying).difference(unfinished):
312 failures.append(thread)
315 ClientError) and thread.exception.status == 502:
316 self.POOLSIZE = self._thread_limit
317 elif thread.isAlive():
318 flying.append(thread)
326 for thread in flying:
329 failures.append(thread)
336 return [failure.kwargs['hash'] for failure in failures]
346 content_encoding=None,
347 content_disposition=None,
351 container_info_cache=None):
352 """Upload an object using multiple connections (threads)
354 :param obj: (str) remote object path
356 :param f: open file descriptor (rb)
358 :param hash_cb: optional progress.bar object for calculating hashes
360 :param upload_cb: optional progress.bar object for uploading
364 :param if_etag_match: (str) Push that value to if-match header at file
367 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368 it does not exist remotely, otherwise the operation will fail.
369 Involves the case of an object with the same path is created while
370 the object is being uploaded.
372 :param content_encoding: (str)
374 :param content_disposition: (str)
376 :param content_type: (str)
378 :param sharing: {'read':[user and/or grp names],
379 'write':[usr and/or grp names]}
381 :param public: (bool)
383 :param container_info_cache: (dict) if given, avoid redundant calls to
384 server for container info (block size and hash information)
386 self._assert_container()
389 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390 f, size, container_info_cache)
391 (hashes, hmap, offset) = ([], {}, 0)
393 content_type = 'application/octet-stream'
395 self._calculate_blocks_for_upload(
402 hashmap = dict(bytes=size, hashes=hashes)
403 missing, obj_headers = self._create_object_or_get_missing_hashes(
405 content_type=content_type,
407 if_etag_match=if_etag_match,
408 if_etag_not_match='*' if if_not_exist else None,
409 content_encoding=content_encoding,
410 content_disposition=content_disposition,
418 upload_gen = upload_cb(len(missing))
419 for i in range(len(missing), len(hashmap['hashes']) + 1):
430 sendlog.info('%s blocks missing' % len(missing))
431 num_of_blocks = len(missing)
432 missing = self._upload_missing_blocks(
438 if num_of_blocks == len(missing):
441 num_of_blocks = len(missing)
446 details = ['%s' % thread.exception for thread in missing]
448 details = ['Also, failed to read thread exceptions']
450 '%s blocks failed to upload' % len(missing),
452 except KeyboardInterrupt:
453 sendlog.info('- - - wait for threads to finish')
454 for thread in activethreads():
462 content_type=content_type,
463 content_encoding=content_encoding,
464 if_etag_match=if_etag_match,
465 if_etag_not_match='*' if if_not_exist else None,
473 def upload_from_string(
474 self, obj, input_str,
480 content_encoding=None,
481 content_disposition=None,
485 container_info_cache=None):
486 """Upload an object using multiple connections (threads)
488 :param obj: (str) remote object path
490 :param input_str: (str) upload content
492 :param hash_cb: optional progress.bar object for calculating hashes
494 :param upload_cb: optional progress.bar object for uploading
498 :param if_etag_match: (str) Push that value to if-match header at file
501 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
502 it does not exist remotely, otherwise the operation will fail.
503 Involves the case of an object with the same path is created while
504 the object is being uploaded.
506 :param content_encoding: (str)
508 :param content_disposition: (str)
510 :param content_type: (str)
512 :param sharing: {'read':[user and/or grp names],
513 'write':[usr and/or grp names]}
515 :param public: (bool)
517 :param container_info_cache: (dict) if given, avoid redundant calls to
518 server for container info (block size and hash information)
520 self._assert_container()
522 blocksize, blockhash, size, nblocks = self._get_file_block_info(
523 fileobj=None, size=len(input_str), cache=container_info_cache)
524 (hashes, hmap, offset) = ([], {}, 0)
526 content_type = 'application/octet-stream'
530 for blockid in range(nblocks):
531 start = blockid * blocksize
532 block = input_str[start: (start + blocksize)]
533 hashes.append(_pithos_hash(block, blockhash))
534 hmap[hashes[blockid]] = (start, block)
536 hashmap = dict(bytes=size, hashes=hashes)
537 missing, obj_headers = self._create_object_or_get_missing_hashes(
539 content_type=content_type,
541 if_etag_match=if_etag_match,
542 if_etag_not_match='*' if if_not_exist else None,
543 content_encoding=content_encoding,
544 content_disposition=content_disposition,
549 num_of_missing = len(missing)
552 self.progress_bar_gen = upload_cb(nblocks)
553 for i in range(nblocks + 1 - num_of_missing):
559 while tries and missing:
563 offset, block = hmap[hash]
564 bird = self._put_block_async(block, hash)
566 unfinished = self._watch_thread_limit(flying)
567 for thread in set(flying).difference(unfinished):
569 failures.append(thread.kwargs['hash'])
571 flying.append(thread)
575 for thread in flying:
578 failures.append(thread.kwargs['hash'])
581 if missing and len(missing) == old_failures:
583 old_failures = len(missing)
586 '%s blocks failed to upload' % len(missing),
587 details=['%s' % thread.exception for thread in missing])
588 except KeyboardInterrupt:
589 sendlog.info('- - - wait for threads to finish')
590 for thread in activethreads():
598 content_type=content_type,
599 content_encoding=content_encoding,
600 if_etag_match=if_etag_match,
601 if_etag_not_match='*' if if_not_exist else None,
609 # download_* auxiliary methods
610 def _get_remote_blocks_info(self, obj, **restargs):
611 #retrieve object hashmap
612 myrange = restargs.pop('data_range', None)
613 hashmap = self.get_object_hashmap(obj, **restargs)
614 restargs['data_range'] = myrange
615 blocksize = int(hashmap['block_size'])
616 blockhash = hashmap['block_hash']
617 total_size = hashmap['bytes']
618 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
620 for i, h in enumerate(hashmap['hashes']):
621 # map_dict[h] = i CHAGE
623 map_dict[h].append(i)
626 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
628 def _dump_blocks_sync(
629 self, obj, remote_hashes, blocksize, total_size, dst, range,
631 for blockid, blockhash in enumerate(remote_hashes):
633 start = blocksize * blockid
634 is_last = start + blocksize > total_size
635 end = (total_size - 1) if is_last else (start + blocksize - 1)
636 (start, end) = _range_up(start, end, range)
637 args['data_range'] = 'bytes=%s-%s' % (start, end)
638 r = self.object_get(obj, success=(200, 206), **args)
643 def _get_block_async(self, obj, **args):
644 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
648 def _hash_from_file(self, fp, start, size, blockhash):
650 block = fp.read(size)
651 h = newhashlib(blockhash)
652 h.update(block.strip('\x00'))
653 return hexlify(h.digest())
655 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
656 """write the results of a greenleted rest call to a file
658 :param offset: the offset of the file up to blocksize
659 - e.g. if the range is 10-100, all blocks will be written to
662 for key, g in flying.items():
667 block = g.value.content
668 for block_start in blockids[key]:
669 local_file.seek(block_start + offset)
670 local_file.write(block)
676 def _dump_blocks_async(
677 self, obj, remote_hashes, blocksize, total_size, local_file,
678 blockhash=None, resume=False, filerange=None, **restargs):
679 file_size = fstat(local_file.fileno()).st_size if resume else 0
681 blockid_dict = dict()
683 if filerange is not None:
684 rstart = int(filerange.split('-')[0])
685 offset = rstart if blocksize > rstart else rstart % blocksize
687 self._init_thread_limit()
688 for block_hash, blockids in remote_hashes.items():
689 blockids = [blk * blocksize for blk in blockids]
690 unsaved = [blk for blk in blockids if not (
691 blk < file_size and block_hash == self._hash_from_file(
692 local_file, blk, blocksize, blockhash))]
693 self._cb_next(len(blockids) - len(unsaved))
696 self._watch_thread_limit(flying.values())
698 flying, blockid_dict, local_file, offset,
700 end = total_size - 1 if (
701 key + blocksize > total_size) else key + blocksize - 1
702 start, end = _range_up(key, end, filerange)
706 restargs['async_headers'] = {
707 'Range': 'bytes=%s-%s' % (start, end)}
708 flying[key] = self._get_block_async(obj, **restargs)
709 blockid_dict[key] = unsaved
711 for thread in flying.values():
713 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
723 if_modified_since=None,
724 if_unmodified_since=None):
725 """Download an object (multiple connections, random blocks)
727 :param obj: (str) remote object path
729 :param dst: open file descriptor (wb+)
731 :param download_cb: optional progress.bar object for downloading
733 :param version: (str) file version
735 :param resume: (bool) if set, preserve already downloaded file parts
737 :param range_str: (str) from, to are file positions (int) in bytes
739 :param if_match: (str)
741 :param if_none_match: (str)
743 :param if_modified_since: (str) formated date
745 :param if_unmodified_since: (str) formated date"""
748 data_range=None if range_str is None else 'bytes=%s' % range_str,
750 if_none_match=if_none_match,
751 if_modified_since=if_modified_since,
752 if_unmodified_since=if_unmodified_since)
759 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
760 assert total_size >= 0
763 self.progress_bar_gen = download_cb(len(hash_list))
767 self._dump_blocks_sync(
776 self._dump_blocks_async(
787 dst.truncate(total_size)
791 def download_to_string(
798 if_modified_since=None,
799 if_unmodified_since=None):
800 """Download an object to a string (multiple connections). This method
801 uses threads for http requests, but stores all content in memory.
803 :param obj: (str) remote object path
805 :param download_cb: optional progress.bar object for downloading
807 :param version: (str) file version
809 :param range_str: (str) from, to are file positions (int) in bytes
811 :param if_match: (str)
813 :param if_none_match: (str)
815 :param if_modified_since: (str) formated date
817 :param if_unmodified_since: (str) formated date
819 :returns: (str) the whole object contents
823 data_range=None if range_str is None else 'bytes=%s' % range_str,
825 if_none_match=if_none_match,
826 if_modified_since=if_modified_since,
827 if_unmodified_since=if_unmodified_since)
834 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
835 assert total_size >= 0
838 self.progress_bar_gen = download_cb(len(hash_list))
841 num_of_blocks = len(remote_hashes)
842 ret = [''] * num_of_blocks
843 self._init_thread_limit()
846 for blockid, blockhash in enumerate(remote_hashes):
847 start = blocksize * blockid
848 is_last = start + blocksize > total_size
849 end = (total_size - 1) if is_last else (start + blocksize - 1)
850 (start, end) = _range_up(start, end, range_str)
852 self._watch_thread_limit(flying.values())
853 flying[blockid] = self._get_block_async(obj, **restargs)
854 for runid, thread in flying.items():
855 if (blockid + 1) == num_of_blocks:
857 elif thread.isAlive():
860 raise thread.exception
861 ret[runid] = thread.value.content
865 except KeyboardInterrupt:
866 sendlog.info('- - - wait for threads to finish')
867 for thread in activethreads():
870 #Command Progress Bar method
871 def _cb_next(self, step=1):
872 if hasattr(self, 'progress_bar_gen'):
874 for i in xrange(step):
875 self.progress_bar_gen.next()
879 def _complete_cb(self):
882 self.progress_bar_gen.next()
886 def get_object_hashmap(
891 if_modified_since=None,
892 if_unmodified_since=None,
895 :param obj: (str) remote object path
897 :param if_match: (str)
899 :param if_none_match: (str)
901 :param if_modified_since: (str) formated date
903 :param if_unmodified_since: (str) formated date
905 :param data_range: (str) from-to where from and to are integers
906 denoting file positions in bytes
915 if_etag_match=if_match,
916 if_etag_not_match=if_none_match,
917 if_modified_since=if_modified_since,
918 if_unmodified_since=if_unmodified_since,
919 data_range=data_range)
920 except ClientError as err:
921 if err.status == 304 or err.status == 412:
926 def set_account_group(self, group, usernames):
930 :param usernames: (list)
932 r = self.account_post(update=True, groups={group: usernames})
935 def del_account_group(self, group):
939 self.account_post(update=True, groups={group: []})
941 def get_account_info(self, until=None):
943 :param until: (str) formated date
947 r = self.account_head(until=until)
948 if r.status_code == 401:
949 raise ClientError("No authorization", status=401)
952 def get_account_quota(self):
957 self.get_account_info(),
958 'X-Account-Policy-Quota',
961 #def get_account_versioning(self):
966 # self.get_account_info(),
967 # 'X-Account-Policy-Versioning',
970 def get_account_meta(self, until=None):
972 :param until: (str) formated date
976 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
978 def get_account_group(self):
982 return filter_in(self.get_account_info(), 'X-Account-Group-')
984 def set_account_meta(self, metapairs):
986 :param metapairs: (dict) {key1:val1, key2:val2, ...}
988 assert(type(metapairs) is dict)
989 r = self.account_post(update=True, metadata=metapairs)
992 def del_account_meta(self, metakey):
994 :param metakey: (str) metadatum key
996 r = self.account_post(update=True, metadata={metakey: ''})
999 #def set_account_quota(self, quota):
1001 # :param quota: (int)
1003 # self.account_post(update=True, quota=quota)
1005 #def set_account_versioning(self, versioning):
1007 # :param versioning: (str)
1009 # r = self.account_post(update=True, versioning=versioning)
1012 def list_containers(self):
1016 r = self.account_get()
1019 def del_container(self, until=None, delimiter=None):
1021 :param until: (str) formated date
1023 :param delimiter: (str) with / empty container
1025 :raises ClientError: 404 Container does not exist
1027 :raises ClientError: 409 Container is not empty
1029 self._assert_container()
1030 r = self.container_delete(
1032 delimiter=delimiter,
1033 success=(204, 404, 409))
1034 if r.status_code == 404:
1036 'Container "%s" does not exist' % self.container,
1038 elif r.status_code == 409:
1040 'Container "%s" is not empty' % self.container,
1044 def get_container_versioning(self, container=None):
1046 :param container: (str)
1050 cnt_back_up = self.container
1052 self.container = container or cnt_back_up
1054 self.get_container_info(),
1055 'X-Container-Policy-Versioning')
1057 self.container = cnt_back_up
1059 def get_container_limit(self, container=None):
1061 :param container: (str)
1065 cnt_back_up = self.container
1067 self.container = container or cnt_back_up
1069 self.get_container_info(),
1070 'X-Container-Policy-Quota')
1072 self.container = cnt_back_up
1074 def get_container_info(self, until=None):
1076 :param until: (str) formated date
1080 :raises ClientError: 404 Container not found
1083 r = self.container_head(until=until)
1084 except ClientError as err:
1085 err.details.append('for container %s' % self.container)
1089 def get_container_meta(self, until=None):
1091 :param until: (str) formated date
1096 self.get_container_info(until=until),
1099 def get_container_object_meta(self, until=None):
1101 :param until: (str) formated date
1106 self.get_container_info(until=until),
1107 'X-Container-Object-Meta')
1109 def set_container_meta(self, metapairs):
1111 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1113 assert(type(metapairs) is dict)
1114 r = self.container_post(update=True, metadata=metapairs)
1117 def del_container_meta(self, metakey):
1119 :param metakey: (str) metadatum key
1121 :returns: (dict) response headers
1123 r = self.container_post(update=True, metadata={metakey: ''})
1126 def set_container_limit(self, limit):
1130 r = self.container_post(update=True, quota=limit)
1133 def set_container_versioning(self, versioning):
1135 :param versioning: (str)
1137 r = self.container_post(update=True, versioning=versioning)
1140 def del_object(self, obj, until=None, delimiter=None):
1142 :param obj: (str) remote object path
1144 :param until: (str) formated date
1146 :param delimiter: (str)
1148 self._assert_container()
1149 r = self.object_delete(obj, until=until, delimiter=delimiter)
1152 def set_object_meta(self, obj, metapairs):
1154 :param obj: (str) remote object path
1156 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1158 assert(type(metapairs) is dict)
1159 r = self.object_post(obj, update=True, metadata=metapairs)
1162 def del_object_meta(self, obj, metakey):
1164 :param obj: (str) remote object path
1166 :param metakey: (str) metadatum key
1168 r = self.object_post(obj, update=True, metadata={metakey: ''})
1171 def publish_object(self, obj):
1173 :param obj: (str) remote object path
1175 :returns: (str) access url
1177 self.object_post(obj, update=True, public=True)
1178 info = self.get_object_info(obj)
1179 return info['x-object-public']
1180 pref, sep, rest = self.base_url.partition('//')
1181 base = rest.split('/')[0]
1182 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1184 def unpublish_object(self, obj):
1186 :param obj: (str) remote object path
1188 r = self.object_post(obj, update=True, public=False)
1191 def get_object_info(self, obj, version=None):
1193 :param obj: (str) remote object path
1195 :param version: (str)
1200 r = self.object_head(obj, version=version)
1202 except ClientError as ce:
1203 if ce.status == 404:
1204 raise ClientError('Object %s not found' % obj, status=404)
1207 def get_object_meta(self, obj, version=None):
1209 :param obj: (str) remote object path
1211 :param version: (str)
1216 self.get_object_info(obj, version=version),
1219 def get_object_sharing(self, obj):
1221 :param obj: (str) remote object path
1226 self.get_object_info(obj),
1231 perms = r['x-object-sharing'].split(';')
1236 raise ClientError('Incorrect reply format')
1237 (key, val) = perm.strip().split('=')
1241 def set_object_sharing(
1243 read_permission=False, write_permission=False):
1244 """Give read/write permisions to an object.
1246 :param obj: (str) remote object path
1248 :param read_permission: (list - bool) users and user groups that get
1249 read permission for this object - False means all previous read
1250 permissions will be removed
1252 :param write_permission: (list - bool) of users and user groups to get
1253 write permission for this object - False means all previous write
1254 permissions will be removed
1256 :returns: (dict) response headers
1259 perms = dict(read=read_permission or '', write=write_permission or '')
1260 r = self.object_post(obj, update=True, permissions=perms)
1263 def del_object_sharing(self, obj):
1265 :param obj: (str) remote object path
1267 return self.set_object_sharing(obj)
1269 def append_object(self, obj, source_file, upload_cb=None):
1271 :param obj: (str) remote object path
1273 :param source_file: open file descriptor
1275 :param upload_db: progress.bar for uploading
1277 self._assert_container()
1278 meta = self.get_container_info()
1279 blocksize = int(meta['x-container-block-size'])
1280 filesize = fstat(source_file.fileno()).st_size
1281 nblocks = 1 + (filesize - 1) // blocksize
1285 self.progress_bar_gen = upload_cb(nblocks)
1288 self._init_thread_limit()
1290 for i in range(nblocks):
1291 block = source_file.read(min(blocksize, filesize - offset))
1292 offset += len(block)
1294 self._watch_thread_limit(flying.values())
1296 flying[i] = SilentEvent(
1297 method=self.object_post,
1300 content_range='bytes */*',
1301 content_type='application/octet-stream',
1302 content_length=len(block),
1306 for key, thread in flying.items():
1307 if thread.isAlive():
1309 unfinished[key] = thread
1312 if thread.exception:
1313 raise thread.exception
1314 headers[key] = thread.value.headers
1317 except KeyboardInterrupt:
1318 sendlog.info('- - - wait for threads to finish')
1319 for thread in activethreads():
1322 from time import sleep
1323 sleep(2 * len(activethreads()))
1324 return headers.values()
1326 def truncate_object(self, obj, upto_bytes):
1328 :param obj: (str) remote object path
1330 :param upto_bytes: max number of bytes to leave on file
1332 :returns: (dict) response headers
1334 r = self.object_post(
1337 content_range='bytes 0-%s/*' % upto_bytes,
1338 content_type='application/octet-stream',
1339 object_bytes=upto_bytes,
1340 source_object=path4url(self.container, obj))
1343 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1344 """Overwrite a part of an object from local source file
1346 :param obj: (str) remote object path
1348 :param start: (int) position in bytes to start overwriting from
1350 :param end: (int) position in bytes to stop overwriting at
1352 :param source_file: open file descriptor
1354 :param upload_db: progress.bar for uploading
1357 r = self.get_object_info(obj)
1358 rf_size = int(r['content-length'])
1359 if rf_size < int(start):
1361 'Range start exceeds file size',
1363 elif rf_size < int(end):
1365 'Range end exceeds file size',
1367 self._assert_container()
1368 meta = self.get_container_info()
1369 blocksize = int(meta['x-container-block-size'])
1370 filesize = fstat(source_file.fileno()).st_size
1371 datasize = int(end) - int(start) + 1
1372 nblocks = 1 + (datasize - 1) // blocksize
1375 self.progress_bar_gen = upload_cb(nblocks)
1378 for i in range(nblocks):
1379 read_size = min(blocksize, filesize - offset, datasize - offset)
1380 block = source_file.read(read_size)
1381 r = self.object_post(
1384 content_type='application/octet-stream',
1385 content_length=len(block),
1386 content_range='bytes %s-%s/*' % (
1388 start + offset + len(block) - 1),
1390 headers.append(dict(r.headers))
1391 offset += len(block)
1397 self, src_container, src_object, dst_container,
1399 source_version=None,
1400 source_account=None,
1405 :param src_container: (str) source container
1407 :param src_object: (str) source object path
1409 :param dst_container: (str) destination container
1411 :param dst_object: (str) destination object path
1413 :param source_version: (str) source object version
1415 :param source_account: (str) account to copy from
1417 :param public: (bool)
1419 :param content_type: (str)
1421 :param delimiter: (str)
1423 :returns: (dict) response headers
1425 self._assert_account()
1426 self.container = dst_container
1427 src_path = path4url(src_container, src_object)
1428 r = self.object_put(
1429 dst_object or src_object,
1433 source_version=source_version,
1434 source_account=source_account,
1436 content_type=content_type,
1437 delimiter=delimiter)
1441 self, src_container, src_object, dst_container,
1443 source_account=None,
1444 source_version=None,
1449 :param src_container: (str) source container
1451 :param src_object: (str) source object path
1453 :param dst_container: (str) destination container
1455 :param dst_object: (str) destination object path
1457 :param source_account: (str) account to move from
1459 :param source_version: (str) source object version
1461 :param public: (bool)
1463 :param content_type: (str)
1465 :param delimiter: (str)
1467 :returns: (dict) response headers
1469 self._assert_account()
1470 self.container = dst_container
1471 dst_object = dst_object or src_object
1472 src_path = path4url(src_container, src_object)
1473 r = self.object_put(
1478 source_account=source_account,
1479 source_version=source_version,
1481 content_type=content_type,
1482 delimiter=delimiter)
1485 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1486 """Get accounts that share with self.account
1490 :param marker: (str)
1494 self._assert_account()
1496 self.set_param('format', 'json')
1497 self.set_param('limit', limit, iff=limit is not None)
1498 self.set_param('marker', marker, iff=marker is not None)
1501 success = kwargs.pop('success', (200, 204))
1502 r = self.get(path, *args, success=success, **kwargs)
1505 def get_object_versionlist(self, obj):
1507 :param obj: (str) remote object path
1511 self._assert_container()
1512 r = self.object_get(obj, format='json', version='list')
1513 return r.json['versions']