1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
76 container=None, sizelimit=None, versioning=None, metadata=None):
78 :param container: (str) if not given, self.container is used instead
80 :param sizelimit: (int) container total size limit in bytes
82 :param versioning: (str) can be auto or whatever supported by server
84 :param metadata: (dict) Custom user-defined metadata of the form
85 { 'name1': 'value1', 'name2': 'value2', ... }
87 :returns: (dict) response headers
89 cnt_back_up = self.container
91 self.container = container or cnt_back_up
92 r = self.container_put(
93 quota=sizelimit, versioning=versioning, metadata=metadata)
96 self.container = cnt_back_up
98 def purge_container(self, container=None):
99 """Delete an empty container and destroy associated blocks
101 cnt_back_up = self.container
103 self.container = container or cnt_back_up
104 r = self.container_delete(until=unicode(time()))
106 self.container = cnt_back_up
109 def upload_object_unchunked(
114 content_encoding=None,
115 content_disposition=None,
120 :param obj: (str) remote object path
122 :param f: open file descriptor
124 :param withHashFile: (bool)
126 :param size: (int) size of data to upload
130 :param content_encoding: (str)
132 :param content_disposition: (str)
134 :param content_type: (str)
136 :param sharing: {'read':[user and/or grp names],
137 'write':[usr and/or grp names]}
139 :param public: (bool)
141 :returns: (dict) created object metadata
143 self._assert_container()
149 data = json.dumps(json.loads(data))
151 raise ClientError('"%s" is not json-formated' % f.name, 1)
153 msg = '"%s" is not a valid hashmap file' % f.name
154 raise ClientError(msg, 1)
157 data = f.read(size) if size else f.read()
162 content_encoding=content_encoding,
163 content_disposition=content_disposition,
164 content_type=content_type,
170 def create_object_by_manifestation(
173 content_encoding=None,
174 content_disposition=None,
179 :param obj: (str) remote object path
183 :param content_encoding: (str)
185 :param content_disposition: (str)
187 :param content_type: (str)
189 :param sharing: {'read':[user and/or grp names],
190 'write':[usr and/or grp names]}
192 :param public: (bool)
194 :returns: (dict) created object metadata
196 self._assert_container()
201 content_encoding=content_encoding,
202 content_disposition=content_disposition,
203 content_type=content_type,
206 manifest='%s/%s' % (self.container, obj))
209 # upload_* auxiliary methods
210 def _put_block_async(self, data, hash):
211 event = SilentEvent(method=self._put_block, data=data, hash=hash)
215 def _put_block(self, data, hash):
216 r = self.container_post(
218 content_type='application/octet-stream',
219 content_length=len(data),
222 assert r.json[0] == hash, 'Local hash does not match server'
224 def _get_file_block_info(self, fileobj, size=None, cache=None):
226 :param fileobj: (file descriptor) source
228 :param size: (int) size of data to upload from source
230 :param cache: (dict) if provided, cache container info response to
231 avoid redundant calls
233 if isinstance(cache, dict):
235 meta = cache[self.container]
237 meta = self.get_container_info()
238 cache[self.container] = meta
240 meta = self.get_container_info()
241 blocksize = int(meta['x-container-block-size'])
242 blockhash = meta['x-container-block-hash']
243 size = size if size is not None else fstat(fileobj.fileno()).st_size
244 nblocks = 1 + (size - 1) // blocksize
245 return (blocksize, blockhash, size, nblocks)
247 def _create_object_or_get_missing_hashes(
254 if_etag_not_match=None,
255 content_encoding=None,
256 content_disposition=None,
264 content_type=content_type,
266 if_etag_match=if_etag_match,
267 if_etag_not_match=if_etag_not_match,
268 content_encoding=content_encoding,
269 content_disposition=content_disposition,
270 permissions=permissions,
273 return (None if r.status_code == 201 else r.json), r.headers
275 def _calculate_blocks_for_upload(
276 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
280 hash_gen = hash_cb(nblocks)
283 for i in range(nblocks):
284 block = fileobj.read(min(blocksize, size - offset))
286 hash = _pithos_hash(block, blockhash)
288 hmap[hash] = (offset, bytes)
292 msg = 'Failed to calculate uploaded blocks:'
293 ' Offset and object size do not match'
294 assert offset == size, msg
296 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297 """upload missing blocks asynchronously"""
299 self._init_thread_limit()
304 offset, bytes = hmap[hash]
306 data = fileobj.read(bytes)
307 r = self._put_block_async(data, hash)
309 unfinished = self._watch_thread_limit(flying)
310 for thread in set(flying).difference(unfinished):
312 failures.append(thread)
315 ClientError) and thread.exception.status == 502:
316 self.POOLSIZE = self._thread_limit
317 elif thread.isAlive():
318 flying.append(thread)
326 for thread in flying:
329 failures.append(thread)
336 return [failure.kwargs['hash'] for failure in failures]
346 content_encoding=None,
347 content_disposition=None,
351 container_info_cache=None):
352 """Upload an object using multiple connections (threads)
354 :param obj: (str) remote object path
356 :param f: open file descriptor (rb)
358 :param hash_cb: optional progress.bar object for calculating hashes
360 :param upload_cb: optional progress.bar object for uploading
364 :param if_etag_match: (str) Push that value to if-match header at file
367 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368 it does not exist remotely, otherwise the operation will fail.
369 Involves the case of an object with the same path is created while
370 the object is being uploaded.
372 :param content_encoding: (str)
374 :param content_disposition: (str)
376 :param content_type: (str)
378 :param sharing: {'read':[user and/or grp names],
379 'write':[usr and/or grp names]}
381 :param public: (bool)
383 :param container_info_cache: (dict) if given, avoid redundant calls to
384 server for container info (block size and hash information)
386 self._assert_container()
389 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390 f, size, container_info_cache)
391 (hashes, hmap, offset) = ([], {}, 0)
393 content_type = 'application/octet-stream'
395 self._calculate_blocks_for_upload(
402 hashmap = dict(bytes=size, hashes=hashes)
403 missing, obj_headers = self._create_object_or_get_missing_hashes(
405 content_type=content_type,
407 if_etag_match=if_etag_match,
408 if_etag_not_match='*' if if_not_exist else None,
409 content_encoding=content_encoding,
410 content_disposition=content_disposition,
418 upload_gen = upload_cb(len(missing))
419 for i in range(len(missing), len(hashmap['hashes']) + 1):
430 sendlog.info('%s blocks missing' % len(missing))
431 num_of_blocks = len(missing)
432 missing = self._upload_missing_blocks(
438 if num_of_blocks == len(missing):
441 num_of_blocks = len(missing)
446 '%s blocks failed to upload' % len(missing),
447 details=['%s' % thread.exception for thread in missing])
448 except KeyboardInterrupt:
449 sendlog.info('- - - wait for threads to finish')
450 for thread in activethreads():
458 content_type=content_type,
459 if_etag_match=if_etag_match,
460 if_etag_not_match='*' if if_not_exist else None,
468 def upload_from_string(
469 self, obj, input_str,
475 content_encoding=None,
476 content_disposition=None,
480 container_info_cache=None):
481 """Upload an object using multiple connections (threads)
483 :param obj: (str) remote object path
485 :param input_str: (str) upload content
487 :param hash_cb: optional progress.bar object for calculating hashes
489 :param upload_cb: optional progress.bar object for uploading
493 :param if_etag_match: (str) Push that value to if-match header at file
496 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497 it does not exist remotely, otherwise the operation will fail.
498 Involves the case of an object with the same path is created while
499 the object is being uploaded.
501 :param content_encoding: (str)
503 :param content_disposition: (str)
505 :param content_type: (str)
507 :param sharing: {'read':[user and/or grp names],
508 'write':[usr and/or grp names]}
510 :param public: (bool)
512 :param container_info_cache: (dict) if given, avoid redundant calls to
513 server for container info (block size and hash information)
515 self._assert_container()
517 blocksize, blockhash, size, nblocks = self._get_file_block_info(
518 fileobj=None, size=len(input_str), cache=container_info_cache)
519 (hashes, hmap, offset) = ([], {}, 0)
521 content_type = 'application/octet-stream'
523 num_of_blocks, blockmod = size / blocksize, size % blocksize
524 num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
528 for blockid in range(num_of_blocks):
529 start = blockid * blocksize
530 block = input_str[start: (start + blocksize)]
531 hashes.append(_pithos_hash(block, blockhash))
532 hmap[hashes[blockid]] = (start, block)
534 hashmap = dict(bytes=size, hashes=hashes)
535 missing, obj_headers = self._create_object_or_get_missing_hashes(
537 content_type=content_type,
539 if_etag_match=if_etag_match,
540 if_etag_not_match='*' if if_not_exist else None,
541 content_encoding=content_encoding,
542 content_disposition=content_disposition,
547 num_of_missing = len(missing)
550 self.progress_bar_gen = upload_cb(num_of_blocks)
551 for i in range(num_of_blocks + 1 - num_of_missing):
557 while tries and missing:
561 offset, block = hmap[hash]
562 bird = self._put_block_async(block, hash)
564 unfinished = self._watch_thread_limit(flying)
565 for thread in set(flying).difference(unfinished):
567 failures.append(thread.kwargs['hash'])
569 flying.append(thread)
573 for thread in flying:
576 failures.append(thread.kwargs['hash'])
579 if missing and len(missing) == old_failures:
581 old_failures = len(missing)
584 '%s blocks failed to upload' % len(missing),
585 details=['%s' % thread.exception for thread in missing])
586 except KeyboardInterrupt:
587 sendlog.info('- - - wait for threads to finish')
588 for thread in activethreads():
596 content_type=content_type,
597 if_etag_match=if_etag_match,
598 if_etag_not_match='*' if if_not_exist else None,
606 # download_* auxiliary methods
607 def _get_remote_blocks_info(self, obj, **restargs):
608 #retrieve object hashmap
609 myrange = restargs.pop('data_range', None)
610 hashmap = self.get_object_hashmap(obj, **restargs)
611 restargs['data_range'] = myrange
612 blocksize = int(hashmap['block_size'])
613 blockhash = hashmap['block_hash']
614 total_size = hashmap['bytes']
615 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
617 for i, h in enumerate(hashmap['hashes']):
618 # map_dict[h] = i CHAGE
620 map_dict[h].append(i)
623 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
625 def _dump_blocks_sync(
626 self, obj, remote_hashes, blocksize, total_size, dst, range,
628 for blockid, blockhash in enumerate(remote_hashes):
630 start = blocksize * blockid
631 is_last = start + blocksize > total_size
632 end = (total_size - 1) if is_last else (start + blocksize - 1)
633 (start, end) = _range_up(start, end, range)
634 args['data_range'] = 'bytes=%s-%s' % (start, end)
635 r = self.object_get(obj, success=(200, 206), **args)
640 def _get_block_async(self, obj, **args):
641 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
645 def _hash_from_file(self, fp, start, size, blockhash):
647 block = fp.read(size)
648 h = newhashlib(blockhash)
649 h.update(block.strip('\x00'))
650 return hexlify(h.digest())
652 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
653 """write the results of a greenleted rest call to a file
655 :param offset: the offset of the file up to blocksize
656 - e.g. if the range is 10-100, all blocks will be written to
659 for key, g in flying.items():
664 block = g.value.content
665 for block_start in blockids[key]:
666 local_file.seek(block_start + offset)
667 local_file.write(block)
673 def _dump_blocks_async(
674 self, obj, remote_hashes, blocksize, total_size, local_file,
675 blockhash=None, resume=False, filerange=None, **restargs):
676 file_size = fstat(local_file.fileno()).st_size if resume else 0
678 blockid_dict = dict()
680 if filerange is not None:
681 rstart = int(filerange.split('-')[0])
682 offset = rstart if blocksize > rstart else rstart % blocksize
684 self._init_thread_limit()
685 for block_hash, blockids in remote_hashes.items():
686 blockids = [blk * blocksize for blk in blockids]
687 unsaved = [blk for blk in blockids if not (
688 blk < file_size and block_hash == self._hash_from_file(
689 local_file, blk, blocksize, blockhash))]
690 self._cb_next(len(blockids) - len(unsaved))
693 self._watch_thread_limit(flying.values())
695 flying, blockid_dict, local_file, offset,
697 end = total_size - 1 if (
698 key + blocksize > total_size) else key + blocksize - 1
699 start, end = _range_up(key, end, filerange)
703 restargs['async_headers'] = {
704 'Range': 'bytes=%s-%s' % (start, end)}
705 flying[key] = self._get_block_async(obj, **restargs)
706 blockid_dict[key] = unsaved
708 for thread in flying.values():
710 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
720 if_modified_since=None,
721 if_unmodified_since=None):
722 """Download an object (multiple connections, random blocks)
724 :param obj: (str) remote object path
726 :param dst: open file descriptor (wb+)
728 :param download_cb: optional progress.bar object for downloading
730 :param version: (str) file version
732 :param resume: (bool) if set, preserve already downloaded file parts
734 :param range_str: (str) from, to are file positions (int) in bytes
736 :param if_match: (str)
738 :param if_none_match: (str)
740 :param if_modified_since: (str) formated date
742 :param if_unmodified_since: (str) formated date"""
745 data_range=None if range_str is None else 'bytes=%s' % range_str,
747 if_none_match=if_none_match,
748 if_modified_since=if_modified_since,
749 if_unmodified_since=if_unmodified_since)
756 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
757 assert total_size >= 0
760 self.progress_bar_gen = download_cb(len(hash_list))
764 self._dump_blocks_sync(
773 self._dump_blocks_async(
784 dst.truncate(total_size)
788 def download_to_string(
795 if_modified_since=None,
796 if_unmodified_since=None):
797 """Download an object to a string (multiple connections). This method
798 uses threads for http requests, but stores all content in memory.
800 :param obj: (str) remote object path
802 :param download_cb: optional progress.bar object for downloading
804 :param version: (str) file version
806 :param range_str: (str) from, to are file positions (int) in bytes
808 :param if_match: (str)
810 :param if_none_match: (str)
812 :param if_modified_since: (str) formated date
814 :param if_unmodified_since: (str) formated date
816 :returns: (str) the whole object contents
820 data_range=None if range_str is None else 'bytes=%s' % range_str,
822 if_none_match=if_none_match,
823 if_modified_since=if_modified_since,
824 if_unmodified_since=if_unmodified_since)
831 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
832 assert total_size >= 0
835 self.progress_bar_gen = download_cb(len(hash_list))
838 num_of_blocks = len(remote_hashes)
839 ret = [''] * num_of_blocks
840 self._init_thread_limit()
843 for blockid, blockhash in enumerate(remote_hashes):
844 start = blocksize * blockid
845 is_last = start + blocksize > total_size
846 end = (total_size - 1) if is_last else (start + blocksize - 1)
847 (start, end) = _range_up(start, end, range_str)
849 self._watch_thread_limit(flying.values())
850 flying[blockid] = self._get_block_async(obj, **restargs)
851 for runid, thread in flying.items():
852 if (blockid + 1) == num_of_blocks:
854 elif thread.isAlive():
857 raise thread.exception
858 ret[runid] = thread.value.content
862 except KeyboardInterrupt:
863 sendlog.info('- - - wait for threads to finish')
864 for thread in activethreads():
867 #Command Progress Bar method
868 def _cb_next(self, step=1):
869 if hasattr(self, 'progress_bar_gen'):
871 for i in xrange(step):
872 self.progress_bar_gen.next()
876 def _complete_cb(self):
879 self.progress_bar_gen.next()
883 def get_object_hashmap(
888 if_modified_since=None,
889 if_unmodified_since=None,
892 :param obj: (str) remote object path
894 :param if_match: (str)
896 :param if_none_match: (str)
898 :param if_modified_since: (str) formated date
900 :param if_unmodified_since: (str) formated date
902 :param data_range: (str) from-to where from and to are integers
903 denoting file positions in bytes
912 if_etag_match=if_match,
913 if_etag_not_match=if_none_match,
914 if_modified_since=if_modified_since,
915 if_unmodified_since=if_unmodified_since,
916 data_range=data_range)
917 except ClientError as err:
918 if err.status == 304 or err.status == 412:
923 def set_account_group(self, group, usernames):
927 :param usernames: (list)
929 r = self.account_post(update=True, groups={group: usernames})
932 def del_account_group(self, group):
936 self.account_post(update=True, groups={group: []})
938 def get_account_info(self, until=None):
940 :param until: (str) formated date
944 r = self.account_head(until=until)
945 if r.status_code == 401:
946 raise ClientError("No authorization", status=401)
949 def get_account_quota(self):
954 self.get_account_info(),
955 'X-Account-Policy-Quota',
958 def get_account_versioning(self):
963 self.get_account_info(),
964 'X-Account-Policy-Versioning',
967 def get_account_meta(self, until=None):
969 :meta until: (str) formated date
973 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
975 def get_account_group(self):
979 return filter_in(self.get_account_info(), 'X-Account-Group-')
981 def set_account_meta(self, metapairs):
983 :param metapairs: (dict) {key1:val1, key2:val2, ...}
985 assert(type(metapairs) is dict)
986 r = self.account_post(update=True, metadata=metapairs)
989 def del_account_meta(self, metakey):
991 :param metakey: (str) metadatum key
993 r = self.account_post(update=True, metadata={metakey: ''})
997 def set_account_quota(self, quota):
1001 self.account_post(update=True, quota=quota)
1004 def set_account_versioning(self, versioning):
1006 "param versioning: (str)
1008 r = self.account_post(update=True, versioning=versioning)
1011 def list_containers(self):
1015 r = self.account_get()
1018 def del_container(self, until=None, delimiter=None):
1020 :param until: (str) formated date
1022 :param delimiter: (str) with / empty container
1024 :raises ClientError: 404 Container does not exist
1026 :raises ClientError: 409 Container is not empty
1028 self._assert_container()
1029 r = self.container_delete(
1031 delimiter=delimiter,
1032 success=(204, 404, 409))
1033 if r.status_code == 404:
1035 'Container "%s" does not exist' % self.container,
1037 elif r.status_code == 409:
1039 'Container "%s" is not empty' % self.container,
1043 def get_container_versioning(self, container=None):
1045 :param container: (str)
1049 cnt_back_up = self.container
1051 self.container = container or cnt_back_up
1053 self.get_container_info(),
1054 'X-Container-Policy-Versioning')
1056 self.container = cnt_back_up
1058 def get_container_limit(self, container=None):
1060 :param container: (str)
1064 cnt_back_up = self.container
1066 self.container = container or cnt_back_up
1068 self.get_container_info(),
1069 'X-Container-Policy-Quota')
1071 self.container = cnt_back_up
1073 def get_container_info(self, until=None):
1075 :param until: (str) formated date
1079 :raises ClientError: 404 Container not found
1082 r = self.container_head(until=until)
1083 except ClientError as err:
1084 err.details.append('for container %s' % self.container)
1088 def get_container_meta(self, until=None):
1090 :param until: (str) formated date
1095 self.get_container_info(until=until),
1098 def get_container_object_meta(self, until=None):
1100 :param until: (str) formated date
1105 self.get_container_info(until=until),
1106 'X-Container-Object-Meta')
1108 def set_container_meta(self, metapairs):
1110 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1112 assert(type(metapairs) is dict)
1113 r = self.container_post(update=True, metadata=metapairs)
1116 def del_container_meta(self, metakey):
1118 :param metakey: (str) metadatum key
1120 :returns: (dict) response headers
1122 r = self.container_post(update=True, metadata={metakey: ''})
1125 def set_container_limit(self, limit):
1129 r = self.container_post(update=True, quota=limit)
1132 def set_container_versioning(self, versioning):
1134 :param versioning: (str)
1136 r = self.container_post(update=True, versioning=versioning)
1139 def del_object(self, obj, until=None, delimiter=None):
1141 :param obj: (str) remote object path
1143 :param until: (str) formated date
1145 :param delimiter: (str)
1147 self._assert_container()
1148 r = self.object_delete(obj, until=until, delimiter=delimiter)
1151 def set_object_meta(self, obj, metapairs):
1153 :param obj: (str) remote object path
1155 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1157 assert(type(metapairs) is dict)
1158 r = self.object_post(obj, update=True, metadata=metapairs)
1161 def del_object_meta(self, obj, metakey):
1163 :param obj: (str) remote object path
1165 :param metakey: (str) metadatum key
1167 r = self.object_post(obj, update=True, metadata={metakey: ''})
1170 def publish_object(self, obj):
1172 :param obj: (str) remote object path
1174 :returns: (str) access url
1176 self.object_post(obj, update=True, public=True)
1177 info = self.get_object_info(obj)
1178 pref, sep, rest = self.base_url.partition('//')
1179 base = rest.split('/')[0]
1180 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1182 def unpublish_object(self, obj):
1184 :param obj: (str) remote object path
1186 r = self.object_post(obj, update=True, public=False)
1189 def get_object_info(self, obj, version=None):
1191 :param obj: (str) remote object path
1193 :param version: (str)
1198 r = self.object_head(obj, version=version)
1200 except ClientError as ce:
1201 if ce.status == 404:
1202 raise ClientError('Object %s not found' % obj, status=404)
1205 def get_object_meta(self, obj, version=None):
1207 :param obj: (str) remote object path
1209 :param version: (str)
1214 self.get_object_info(obj, version=version),
1217 def get_object_sharing(self, obj):
1219 :param obj: (str) remote object path
1224 self.get_object_info(obj),
1229 perms = r['x-object-sharing'].split(';')
1234 raise ClientError('Incorrect reply format')
1235 (key, val) = perm.strip().split('=')
1239 def set_object_sharing(
1241 read_permition=False, write_permition=False):
1242 """Give read/write permisions to an object.
1244 :param obj: (str) remote object path
1246 :param read_permition: (list - bool) users and user groups that get
1247 read permition for this object - False means all previous read
1248 permissions will be removed
1250 :param write_perimition: (list - bool) of users and user groups to get
1251 write permition for this object - False means all previous write
1252 permissions will be removed
1254 :returns: (dict) response headers
1257 perms = dict(read=read_permition or '', write=write_permition or '')
1258 r = self.object_post(obj, update=True, permissions=perms)
1261 def del_object_sharing(self, obj):
1263 :param obj: (str) remote object path
1265 return self.set_object_sharing(obj)
1267 def append_object(self, obj, source_file, upload_cb=None):
1269 :param obj: (str) remote object path
1271 :param source_file: open file descriptor
1273 :param upload_db: progress.bar for uploading
1276 self._assert_container()
1277 meta = self.get_container_info()
1278 blocksize = int(meta['x-container-block-size'])
1279 filesize = fstat(source_file.fileno()).st_size
1280 nblocks = 1 + (filesize - 1) // blocksize
1284 self.progress_bar_gen = upload_cb(nblocks)
1287 self._init_thread_limit()
1289 for i in range(nblocks):
1290 block = source_file.read(min(blocksize, filesize - offset))
1291 offset += len(block)
1293 self._watch_thread_limit(flying.values())
1295 flying[i] = SilentEvent(
1296 method=self.object_post,
1299 content_range='bytes */*',
1300 content_type='application/octet-stream',
1301 content_length=len(block),
1305 for key, thread in flying.items():
1306 if thread.isAlive():
1308 unfinished[key] = thread
1311 if thread.exception:
1312 raise thread.exception
1313 headers[key] = thread.value.headers
1316 except KeyboardInterrupt:
1317 sendlog.info('- - - wait for threads to finish')
1318 for thread in activethreads():
1320 return headers.values()
1322 def truncate_object(self, obj, upto_bytes):
1324 :param obj: (str) remote object path
1326 :param upto_bytes: max number of bytes to leave on file
1328 :returns: (dict) response headers
1330 r = self.object_post(
1333 content_range='bytes 0-%s/*' % upto_bytes,
1334 content_type='application/octet-stream',
1335 object_bytes=upto_bytes,
1336 source_object=path4url(self.container, obj))
1339 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1340 """Overwrite a part of an object from local source file
1342 :param obj: (str) remote object path
1344 :param start: (int) position in bytes to start overwriting from
1346 :param end: (int) position in bytes to stop overwriting at
1348 :param source_file: open file descriptor
1350 :param upload_db: progress.bar for uploading
1353 r = self.get_object_info(obj)
1354 rf_size = int(r['content-length'])
1355 if rf_size < int(start):
1357 'Range start exceeds file size',
1359 elif rf_size < int(end):
1361 'Range end exceeds file size',
1363 self._assert_container()
1364 meta = self.get_container_info()
1365 blocksize = int(meta['x-container-block-size'])
1366 filesize = fstat(source_file.fileno()).st_size
1367 datasize = int(end) - int(start) + 1
1368 nblocks = 1 + (datasize - 1) // blocksize
1371 self.progress_bar_gen = upload_cb(nblocks)
1374 for i in range(nblocks):
1375 read_size = min(blocksize, filesize - offset, datasize - offset)
1376 block = source_file.read(read_size)
1377 r = self.object_post(
1380 content_type='application/octet-stream',
1381 content_length=len(block),
1382 content_range='bytes %s-%s/*' % (
1384 start + offset + len(block) - 1),
1386 headers.append(dict(r.headers))
1387 offset += len(block)
1393 self, src_container, src_object, dst_container,
1395 source_version=None,
1396 source_account=None,
1401 :param src_container: (str) source container
1403 :param src_object: (str) source object path
1405 :param dst_container: (str) destination container
1407 :param dst_object: (str) destination object path
1409 :param source_version: (str) source object version
1411 :param source_account: (str) account to copy from
1413 :param public: (bool)
1415 :param content_type: (str)
1417 :param delimiter: (str)
1419 :returns: (dict) response headers
1421 self._assert_account()
1422 self.container = dst_container
1423 src_path = path4url(src_container, src_object)
1424 r = self.object_put(
1425 dst_object or src_object,
1429 source_version=source_version,
1430 source_account=source_account,
1432 content_type=content_type,
1433 delimiter=delimiter)
1437 self, src_container, src_object, dst_container,
1439 source_account=None,
1440 source_version=None,
1445 :param src_container: (str) source container
1447 :param src_object: (str) source object path
1449 :param dst_container: (str) destination container
1451 :param dst_object: (str) destination object path
1453 :param source_account: (str) account to move from
1455 :param source_version: (str) source object version
1457 :param public: (bool)
1459 :param content_type: (str)
1461 :param delimiter: (str)
1463 :returns: (dict) response headers
1465 self._assert_account()
1466 self.container = dst_container
1467 dst_object = dst_object or src_object
1468 src_path = path4url(src_container, src_object)
1469 r = self.object_put(
1474 source_account=source_account,
1475 source_version=source_version,
1477 content_type=content_type,
1478 delimiter=delimiter)
1481 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1482 """Get accounts that share with self.account
1486 :param marker: (str)
1490 self._assert_account()
1492 self.set_param('format', 'json')
1493 self.set_param('limit', limit, iff=limit is not None)
1494 self.set_param('marker', marker, iff=marker is not None)
1497 success = kwargs.pop('success', (200, 204))
1498 r = self.get(path, *args, success=success, **kwargs)
1501 def get_object_versionlist(self, obj):
1503 :param obj: (str) remote object path
1507 self._assert_container()
1508 r = self.object_get(obj, format='json', version='list')
1509 return r.json['versions']