1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, max_value, a_range):
57 :param start: (int) the window bottom
59 :param end: (int) the window top
61 :param max_value: (int) maximum accepted value
63 :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64 where X: x|x-y|-x where x < y and x, y natural numbers
66 :returns: (str) a range string cut-off for the start-end range
67 an empty response means this window is out of range
69 assert start >= 0, '_range_up called w. start(%s) < 0' % start
70 assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
72 assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
75 return '%s-%s' % (start, end)
77 for some_range in a_range.split(','):
78 v0, sep, v1 = some_range.partition('-')
83 if v1 < start or v0 > end or v1 < v0:
85 v0 = v0 if v0 > start else start
86 v1 = v1 if v1 < end else end
87 selected.append('%s-%s' % (v0, v1))
91 v1 = v0 if v0 <= end else end
92 selected.append('%s-%s' % (start, v1))
95 if max_value - v1 > end:
97 v0 = (max_value - v1) if max_value - v1 > start else start
98 selected.append('%s-%s' % (v0, end))
99 return ','.join(selected)
102 class PithosClient(PithosRestClient):
103 """Synnefo Pithos+ API client"""
105 def __init__(self, base_url, token, account=None, container=None):
106 super(PithosClient, self).__init__(base_url, token, account, container)
108 def create_container(
110 container=None, sizelimit=None, versioning=None, metadata=None,
113 :param container: (str) if not given, self.container is used instead
115 :param sizelimit: (int) container total size limit in bytes
117 :param versioning: (str) can be auto or whatever supported by server
119 :param metadata: (dict) Custom user-defined metadata of the form
120 { 'name1': 'value1', 'name2': 'value2', ... }
122 :returns: (dict) response headers
124 cnt_back_up = self.container
126 self.container = container or cnt_back_up
127 r = self.container_put(
128 quota=sizelimit, versioning=versioning, metadata=metadata,
132 self.container = cnt_back_up
134 def purge_container(self, container=None):
135 """Delete an empty container and destroy associated blocks"""
136 cnt_back_up = self.container
138 self.container = container or cnt_back_up
139 r = self.container_delete(until=unicode(time()))
141 self.container = cnt_back_up
144 def upload_object_unchunked(
149 content_encoding=None,
150 content_disposition=None,
155 :param obj: (str) remote object path
157 :param f: open file descriptor
159 :param withHashFile: (bool)
161 :param size: (int) size of data to upload
165 :param content_encoding: (str)
167 :param content_disposition: (str)
169 :param content_type: (str)
171 :param sharing: {'read':[user and/or grp names],
172 'write':[usr and/or grp names]}
174 :param public: (bool)
176 :returns: (dict) created object metadata
178 self._assert_container()
184 data = json.dumps(json.loads(data))
186 raise ClientError('"%s" is not json-formated' % f.name, 1)
188 msg = '"%s" is not a valid hashmap file' % f.name
189 raise ClientError(msg, 1)
192 data = readall(f, size) if size else f.read()
197 content_encoding=content_encoding,
198 content_disposition=content_disposition,
199 content_type=content_type,
205 def create_object_by_manifestation(
208 content_encoding=None,
209 content_disposition=None,
214 :param obj: (str) remote object path
218 :param content_encoding: (str)
220 :param content_disposition: (str)
222 :param content_type: (str)
224 :param sharing: {'read':[user and/or grp names],
225 'write':[usr and/or grp names]}
227 :param public: (bool)
229 :returns: (dict) created object metadata
231 self._assert_container()
236 content_encoding=content_encoding,
237 content_disposition=content_disposition,
238 content_type=content_type,
241 manifest='%s/%s' % (self.container, obj))
244 # upload_* auxiliary methods
245 def _put_block_async(self, data, hash):
246 event = SilentEvent(method=self._put_block, data=data, hash=hash)
250 def _put_block(self, data, hash):
251 r = self.container_post(
253 content_type='application/octet-stream',
254 content_length=len(data),
257 assert r.json[0] == hash, 'Local hash does not match server'
259 def _get_file_block_info(self, fileobj, size=None, cache=None):
261 :param fileobj: (file descriptor) source
263 :param size: (int) size of data to upload from source
265 :param cache: (dict) if provided, cache container info response to
266 avoid redundant calls
268 if isinstance(cache, dict):
270 meta = cache[self.container]
272 meta = self.get_container_info()
273 cache[self.container] = meta
275 meta = self.get_container_info()
276 blocksize = int(meta['x-container-block-size'])
277 blockhash = meta['x-container-block-hash']
278 size = size if size is not None else fstat(fileobj.fileno()).st_size
279 nblocks = 1 + (size - 1) // blocksize
280 return (blocksize, blockhash, size, nblocks)
282 def _create_object_or_get_missing_hashes(
289 if_etag_not_match=None,
290 content_encoding=None,
291 content_disposition=None,
299 content_type=content_type,
301 if_etag_match=if_etag_match,
302 if_etag_not_match=if_etag_not_match,
303 content_encoding=content_encoding,
304 content_disposition=content_disposition,
305 permissions=permissions,
308 return (None if r.status_code == 201 else r.json), r.headers
310 def _calculate_blocks_for_upload(
311 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
315 hash_gen = hash_cb(nblocks)
318 for i in xrange(nblocks):
319 block = readall(fileobj, min(blocksize, size - offset))
323 hash = _pithos_hash(block, blockhash)
325 hmap[hash] = (offset, bytes)
329 msg = ('Failed to calculate uploading blocks: '
330 'read bytes(%s) != requested size (%s)' % (offset, size))
331 assert offset == size, msg
333 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334 """upload missing blocks asynchronously"""
336 self._init_thread_limit()
341 offset, bytes = hmap[hash]
343 data = readall(fileobj, bytes)
344 r = self._put_block_async(data, hash)
346 unfinished = self._watch_thread_limit(flying)
347 for thread in set(flying).difference(unfinished):
349 failures.append(thread)
352 ClientError) and thread.exception.status == 502:
353 self.POOLSIZE = self._thread_limit
354 elif thread.isAlive():
355 flying.append(thread)
363 for thread in flying:
366 failures.append(thread)
373 return [failure.kwargs['hash'] for failure in failures]
383 content_encoding=None,
384 content_disposition=None,
388 container_info_cache=None):
389 """Upload an object using multiple connections (threads)
391 :param obj: (str) remote object path
393 :param f: open file descriptor (rb)
395 :param hash_cb: optional progress.bar object for calculating hashes
397 :param upload_cb: optional progress.bar object for uploading
401 :param if_etag_match: (str) Push that value to if-match header at file
404 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405 it does not exist remotely, otherwise the operation will fail.
406 Involves the case of an object with the same path is created while
407 the object is being uploaded.
409 :param content_encoding: (str)
411 :param content_disposition: (str)
413 :param content_type: (str)
415 :param sharing: {'read':[user and/or grp names],
416 'write':[usr and/or grp names]}
418 :param public: (bool)
420 :param container_info_cache: (dict) if given, avoid redundant calls to
421 server for container info (block size and hash information)
423 self._assert_container()
426 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427 f, size, container_info_cache)
428 (hashes, hmap, offset) = ([], {}, 0)
430 content_type = 'application/octet-stream'
432 self._calculate_blocks_for_upload(
439 hashmap = dict(bytes=size, hashes=hashes)
440 missing, obj_headers = self._create_object_or_get_missing_hashes(
442 content_type=content_type,
444 if_etag_match=if_etag_match,
445 if_etag_not_match='*' if if_not_exist else None,
446 content_encoding=content_encoding,
447 content_disposition=content_disposition,
455 upload_gen = upload_cb(len(missing))
456 for i in range(len(missing), len(hashmap['hashes']) + 1):
467 sendlog.info('%s blocks missing' % len(missing))
468 num_of_blocks = len(missing)
469 missing = self._upload_missing_blocks(
475 if num_of_blocks == len(missing):
478 num_of_blocks = len(missing)
483 details = ['%s' % thread.exception for thread in missing]
485 details = ['Also, failed to read thread exceptions']
487 '%s blocks failed to upload' % len(missing),
489 except KeyboardInterrupt:
490 sendlog.info('- - - wait for threads to finish')
491 for thread in activethreads():
499 content_type=content_type,
500 content_encoding=content_encoding,
501 if_etag_match=if_etag_match,
502 if_etag_not_match='*' if if_not_exist else None,
510 def upload_from_string(
511 self, obj, input_str,
517 content_encoding=None,
518 content_disposition=None,
522 container_info_cache=None):
523 """Upload an object using multiple connections (threads)
525 :param obj: (str) remote object path
527 :param input_str: (str) upload content
529 :param hash_cb: optional progress.bar object for calculating hashes
531 :param upload_cb: optional progress.bar object for uploading
535 :param if_etag_match: (str) Push that value to if-match header at file
538 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
539 it does not exist remotely, otherwise the operation will fail.
540 Involves the case of an object with the same path is created while
541 the object is being uploaded.
543 :param content_encoding: (str)
545 :param content_disposition: (str)
547 :param content_type: (str)
549 :param sharing: {'read':[user and/or grp names],
550 'write':[usr and/or grp names]}
552 :param public: (bool)
554 :param container_info_cache: (dict) if given, avoid redundant calls to
555 server for container info (block size and hash information)
557 self._assert_container()
559 blocksize, blockhash, size, nblocks = self._get_file_block_info(
560 fileobj=None, size=len(input_str), cache=container_info_cache)
561 (hashes, hmap, offset) = ([], {}, 0)
563 content_type = 'application/octet-stream'
567 for blockid in range(nblocks):
568 start = blockid * blocksize
569 block = input_str[start: (start + blocksize)]
570 hashes.append(_pithos_hash(block, blockhash))
571 hmap[hashes[blockid]] = (start, block)
573 hashmap = dict(bytes=size, hashes=hashes)
574 missing, obj_headers = self._create_object_or_get_missing_hashes(
576 content_type=content_type,
578 if_etag_match=if_etag_match,
579 if_etag_not_match='*' if if_not_exist else None,
580 content_encoding=content_encoding,
581 content_disposition=content_disposition,
586 num_of_missing = len(missing)
589 self.progress_bar_gen = upload_cb(nblocks)
590 for i in range(nblocks + 1 - num_of_missing):
596 while tries and missing:
600 offset, block = hmap[hash]
601 bird = self._put_block_async(block, hash)
603 unfinished = self._watch_thread_limit(flying)
604 for thread in set(flying).difference(unfinished):
606 failures.append(thread.kwargs['hash'])
608 flying.append(thread)
612 for thread in flying:
615 failures.append(thread.kwargs['hash'])
618 if missing and len(missing) == old_failures:
620 old_failures = len(missing)
622 raise ClientError('%s blocks failed to upload' % len(missing))
623 except KeyboardInterrupt:
624 sendlog.info('- - - wait for threads to finish')
625 for thread in activethreads():
633 content_type=content_type,
634 content_encoding=content_encoding,
635 if_etag_match=if_etag_match,
636 if_etag_not_match='*' if if_not_exist else None,
644 # download_* auxiliary methods
645 def _get_remote_blocks_info(self, obj, **restargs):
646 #retrieve object hashmap
647 myrange = restargs.pop('data_range', None)
648 hashmap = self.get_object_hashmap(obj, **restargs)
649 restargs['data_range'] = myrange
650 blocksize = int(hashmap['block_size'])
651 blockhash = hashmap['block_hash']
652 total_size = hashmap['bytes']
653 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
655 for i, h in enumerate(hashmap['hashes']):
656 # map_dict[h] = i CHAGE
658 map_dict[h].append(i)
661 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
663 def _dump_blocks_sync(
664 self, obj, remote_hashes, blocksize, total_size, dst, crange,
668 for blockid, blockhash in enumerate(remote_hashes):
670 start = blocksize * blockid
671 is_last = start + blocksize > total_size
672 end = (total_size - 1) if is_last else (start + blocksize - 1)
673 data_range = _range_up(start, end, total_size, crange)
677 args['data_range'] = 'bytes=%s' % data_range
678 r = self.object_get(obj, success=(200, 206), **args)
683 def _get_block_async(self, obj, **args):
684 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
688 def _hash_from_file(self, fp, start, size, blockhash):
690 block = readall(fp, size)
691 h = newhashlib(blockhash)
692 h.update(block.strip('\x00'))
693 return hexlify(h.digest())
695 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
696 """write the results of a greenleted rest call to a file
698 :param offset: the offset of the file up to blocksize
699 - e.g. if the range is 10-100, all blocks will be written to
702 for key, g in flying.items():
707 block = g.value.content
708 for block_start in blockids[key]:
709 local_file.seek(block_start + offset)
710 local_file.write(block)
716 def _dump_blocks_async(
717 self, obj, remote_hashes, blocksize, total_size, local_file,
718 blockhash=None, resume=False, filerange=None, **restargs):
719 file_size = fstat(local_file.fileno()).st_size if resume else 0
721 blockid_dict = dict()
724 self._init_thread_limit()
725 for block_hash, blockids in remote_hashes.items():
726 blockids = [blk * blocksize for blk in blockids]
727 unsaved = [blk for blk in blockids if not (
728 blk < file_size and block_hash == self._hash_from_file(
729 local_file, blk, blocksize, blockhash))]
730 self._cb_next(len(blockids) - len(unsaved))
733 self._watch_thread_limit(flying.values())
735 flying, blockid_dict, local_file, offset,
737 end = total_size - 1 if (
738 key + blocksize > total_size) else key + blocksize - 1
742 data_range = _range_up(key, end, total_size, filerange)
747 'async_headers'] = {'Range': 'bytes=%s' % data_range}
748 flying[key] = self._get_block_async(obj, **restargs)
749 blockid_dict[key] = unsaved
751 for thread in flying.values():
753 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
763 if_modified_since=None,
764 if_unmodified_since=None):
765 """Download an object (multiple connections, random blocks)
767 :param obj: (str) remote object path
769 :param dst: open file descriptor (wb+)
771 :param download_cb: optional progress.bar object for downloading
773 :param version: (str) file version
775 :param resume: (bool) if set, preserve already downloaded file parts
777 :param range_str: (str) from, to are file positions (int) in bytes
779 :param if_match: (str)
781 :param if_none_match: (str)
783 :param if_modified_since: (str) formated date
785 :param if_unmodified_since: (str) formated date"""
788 data_range=None if range_str is None else 'bytes=%s' % range_str,
790 if_none_match=if_none_match,
791 if_modified_since=if_modified_since,
792 if_unmodified_since=if_unmodified_since)
799 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
800 assert total_size >= 0
803 self.progress_bar_gen = download_cb(len(hash_list))
807 self._dump_blocks_sync(
816 self._dump_blocks_async(
827 dst.truncate(total_size)
831 def download_to_string(
838 if_modified_since=None,
839 if_unmodified_since=None):
840 """Download an object to a string (multiple connections). This method
841 uses threads for http requests, but stores all content in memory.
843 :param obj: (str) remote object path
845 :param download_cb: optional progress.bar object for downloading
847 :param version: (str) file version
849 :param range_str: (str) from, to are file positions (int) in bytes
851 :param if_match: (str)
853 :param if_none_match: (str)
855 :param if_modified_since: (str) formated date
857 :param if_unmodified_since: (str) formated date
859 :returns: (str) the whole object contents
863 data_range=None if range_str is None else 'bytes=%s' % range_str,
865 if_none_match=if_none_match,
866 if_modified_since=if_modified_since,
867 if_unmodified_since=if_unmodified_since)
874 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
875 assert total_size >= 0
878 self.progress_bar_gen = download_cb(len(hash_list))
881 num_of_blocks = len(remote_hashes)
882 ret = [''] * num_of_blocks
883 self._init_thread_limit()
886 for blockid, blockhash in enumerate(remote_hashes):
887 start = blocksize * blockid
888 is_last = start + blocksize > total_size
889 end = (total_size - 1) if is_last else (start + blocksize - 1)
890 data_range_str = _range_up(start, end, end, range_str)
892 self._watch_thread_limit(flying.values())
893 restargs['data_range'] = 'bytes=%s' % data_range_str
894 flying[blockid] = self._get_block_async(obj, **restargs)
895 for runid, thread in flying.items():
896 if (blockid + 1) == num_of_blocks:
898 elif thread.isAlive():
901 raise thread.exception
902 ret[runid] = thread.value.content
906 except KeyboardInterrupt:
907 sendlog.info('- - - wait for threads to finish')
908 for thread in activethreads():
911 #Command Progress Bar method
912 def _cb_next(self, step=1):
913 if hasattr(self, 'progress_bar_gen'):
915 for i in xrange(step):
916 self.progress_bar_gen.next()
920 def _complete_cb(self):
923 self.progress_bar_gen.next()
927 def get_object_hashmap(
932 if_modified_since=None,
933 if_unmodified_since=None):
935 :param obj: (str) remote object path
937 :param if_match: (str)
939 :param if_none_match: (str)
941 :param if_modified_since: (str) formated date
943 :param if_unmodified_since: (str) formated date
952 if_etag_match=if_match,
953 if_etag_not_match=if_none_match,
954 if_modified_since=if_modified_since,
955 if_unmodified_since=if_unmodified_since)
956 except ClientError as err:
957 if err.status == 304 or err.status == 412:
962 def set_account_group(self, group, usernames):
966 :param usernames: (list)
968 r = self.account_post(update=True, groups={group: usernames})
971 def del_account_group(self, group):
975 self.account_post(update=True, groups={group: []})
977 def get_account_info(self, until=None):
979 :param until: (str) formated date
983 r = self.account_head(until=until)
984 if r.status_code == 401:
985 raise ClientError("No authorization", status=401)
988 def get_account_quota(self):
993 self.get_account_info(),
994 'X-Account-Policy-Quota',
997 #def get_account_versioning(self):
1002 # self.get_account_info(),
1003 # 'X-Account-Policy-Versioning',
1006 def get_account_meta(self, until=None):
1008 :param until: (str) formated date
1012 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1014 def get_account_group(self):
1018 return filter_in(self.get_account_info(), 'X-Account-Group-')
1020 def set_account_meta(self, metapairs):
1022 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1024 assert(type(metapairs) is dict)
1025 r = self.account_post(update=True, metadata=metapairs)
1028 def del_account_meta(self, metakey):
1030 :param metakey: (str) metadatum key
1032 r = self.account_post(update=True, metadata={metakey: ''})
1035 #def set_account_quota(self, quota):
1037 # :param quota: (int)
1039 # self.account_post(update=True, quota=quota)
1041 #def set_account_versioning(self, versioning):
1043 # :param versioning: (str)
1045 # r = self.account_post(update=True, versioning=versioning)
1048 def list_containers(self):
1052 r = self.account_get()
1055 def del_container(self, until=None, delimiter=None):
1057 :param until: (str) formated date
1059 :param delimiter: (str) with / empty container
1061 :raises ClientError: 404 Container does not exist
1063 :raises ClientError: 409 Container is not empty
1065 self._assert_container()
1066 r = self.container_delete(
1068 delimiter=delimiter,
1069 success=(204, 404, 409))
1070 if r.status_code == 404:
1072 'Container "%s" does not exist' % self.container,
1074 elif r.status_code == 409:
1076 'Container "%s" is not empty' % self.container,
1080 def get_container_versioning(self, container=None):
1082 :param container: (str)
1086 cnt_back_up = self.container
1088 self.container = container or cnt_back_up
1090 self.get_container_info(),
1091 'X-Container-Policy-Versioning')
1093 self.container = cnt_back_up
1095 def get_container_limit(self, container=None):
1097 :param container: (str)
1101 cnt_back_up = self.container
1103 self.container = container or cnt_back_up
1105 self.get_container_info(),
1106 'X-Container-Policy-Quota')
1108 self.container = cnt_back_up
1110 def get_container_info(self, container=None, until=None):
1112 :param until: (str) formated date
1116 :raises ClientError: 404 Container not found
1118 bck_cont = self.container
1120 self.container = container or bck_cont
1121 self._assert_container()
1122 r = self.container_head(until=until)
1123 except ClientError as err:
1124 err.details.append('for container %s' % self.container)
1127 self.container = bck_cont
1130 def get_container_meta(self, until=None):
1132 :param until: (str) formated date
1137 self.get_container_info(until=until), 'X-Container-Meta')
1139 def get_container_object_meta(self, until=None):
1141 :param until: (str) formated date
1146 self.get_container_info(until=until), 'X-Container-Object-Meta')
1148 def set_container_meta(self, metapairs):
1150 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1152 assert(type(metapairs) is dict)
1153 r = self.container_post(update=True, metadata=metapairs)
1156 def del_container_meta(self, metakey):
1158 :param metakey: (str) metadatum key
1160 :returns: (dict) response headers
1162 r = self.container_post(update=True, metadata={metakey: ''})
1165 def set_container_limit(self, limit):
1169 r = self.container_post(update=True, quota=limit)
1172 def set_container_versioning(self, versioning):
1174 :param versioning: (str)
1176 r = self.container_post(update=True, versioning=versioning)
1179 def del_object(self, obj, until=None, delimiter=None):
1181 :param obj: (str) remote object path
1183 :param until: (str) formated date
1185 :param delimiter: (str)
1187 self._assert_container()
1188 r = self.object_delete(obj, until=until, delimiter=delimiter)
1191 def set_object_meta(self, obj, metapairs):
1193 :param obj: (str) remote object path
1195 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1197 assert(type(metapairs) is dict)
1198 r = self.object_post(obj, update=True, metadata=metapairs)
1201 def del_object_meta(self, obj, metakey):
1203 :param obj: (str) remote object path
1205 :param metakey: (str) metadatum key
1207 r = self.object_post(obj, update=True, metadata={metakey: ''})
1210 def publish_object(self, obj):
1212 :param obj: (str) remote object path
1214 :returns: (str) access url
1216 self.object_post(obj, update=True, public=True)
1217 info = self.get_object_info(obj)
1218 return info['x-object-public']
1219 pref, sep, rest = self.base_url.partition('//')
1220 base = rest.split('/')[0]
1221 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1223 def unpublish_object(self, obj):
1225 :param obj: (str) remote object path
1227 r = self.object_post(obj, update=True, public=False)
1230 def get_object_info(self, obj, version=None):
1232 :param obj: (str) remote object path
1234 :param version: (str)
1239 r = self.object_head(obj, version=version)
1241 except ClientError as ce:
1242 if ce.status == 404:
1243 raise ClientError('Object %s not found' % obj, status=404)
1246 def get_object_meta(self, obj, version=None):
1248 :param obj: (str) remote object path
1250 :param version: (str)
1255 self.get_object_info(obj, version=version),
1258 def get_object_sharing(self, obj):
1260 :param obj: (str) remote object path
1265 self.get_object_info(obj),
1270 perms = r['x-object-sharing'].split(';')
1275 raise ClientError('Incorrect reply format')
1276 (key, val) = perm.strip().split('=')
1280 def set_object_sharing(
1282 read_permission=False, write_permission=False):
1283 """Give read/write permisions to an object.
1285 :param obj: (str) remote object path
1287 :param read_permission: (list - bool) users and user groups that get
1288 read permission for this object - False means all previous read
1289 permissions will be removed
1291 :param write_permission: (list - bool) of users and user groups to get
1292 write permission for this object - False means all previous write
1293 permissions will be removed
1295 :returns: (dict) response headers
1298 perms = dict(read=read_permission or '', write=write_permission or '')
1299 r = self.object_post(obj, update=True, permissions=perms)
1302 def del_object_sharing(self, obj):
1304 :param obj: (str) remote object path
1306 return self.set_object_sharing(obj)
1308 def append_object(self, obj, source_file, upload_cb=None):
1310 :param obj: (str) remote object path
1312 :param source_file: open file descriptor
1314 :param upload_db: progress.bar for uploading
1316 self._assert_container()
1317 meta = self.get_container_info()
1318 blocksize = int(meta['x-container-block-size'])
1319 filesize = fstat(source_file.fileno()).st_size
1320 nblocks = 1 + (filesize - 1) // blocksize
1324 self.progress_bar_gen = upload_cb(nblocks)
1327 self._init_thread_limit()
1329 for i in range(nblocks):
1330 block = source_file.read(min(blocksize, filesize - offset))
1331 offset += len(block)
1333 self._watch_thread_limit(flying.values())
1335 flying[i] = SilentEvent(
1336 method=self.object_post,
1339 content_range='bytes */*',
1340 content_type='application/octet-stream',
1341 content_length=len(block),
1345 for key, thread in flying.items():
1346 if thread.isAlive():
1348 unfinished[key] = thread
1351 if thread.exception:
1352 raise thread.exception
1353 headers[key] = thread.value.headers
1356 except KeyboardInterrupt:
1357 sendlog.info('- - - wait for threads to finish')
1358 for thread in activethreads():
1361 from time import sleep
1362 sleep(2 * len(activethreads()))
1363 return headers.values()
1365 def truncate_object(self, obj, upto_bytes):
1367 :param obj: (str) remote object path
1369 :param upto_bytes: max number of bytes to leave on file
1371 :returns: (dict) response headers
1373 ctype = self.get_object_info(obj)['content-type']
1374 r = self.object_post(
1377 content_range='bytes 0-%s/*' % upto_bytes,
1379 object_bytes=upto_bytes,
1380 source_object=path4url(self.container, obj))
1383 def overwrite_object(
1384 self, obj, start, end, source_file,
1385 source_version=None, upload_cb=None):
1386 """Overwrite a part of an object from local source file
1387 ATTENTION: content_type must always be application/octet-stream
1389 :param obj: (str) remote object path
1391 :param start: (int) position in bytes to start overwriting from
1393 :param end: (int) position in bytes to stop overwriting at
1395 :param source_file: open file descriptor
1397 :param upload_db: progress.bar for uploading
1400 self._assert_container()
1401 r = self.get_object_info(obj, version=source_version)
1402 rf_size = int(r['content-length'])
1403 start, end = int(start), int(end)
1404 assert rf_size >= start, 'Range start %s exceeds file size %s' % (
1406 meta = self.get_container_info()
1407 blocksize = int(meta['x-container-block-size'])
1408 filesize = fstat(source_file.fileno()).st_size
1409 datasize = end - start + 1
1410 nblocks = 1 + (datasize - 1) // blocksize
1413 self.progress_bar_gen = upload_cb(nblocks)
1416 for i in range(nblocks):
1417 read_size = min(blocksize, filesize - offset, datasize - offset)
1418 block = source_file.read(read_size)
1419 r = self.object_post(
1422 content_type='application/octet-stream',
1423 content_length=len(block),
1424 content_range='bytes %s-%s/*' % (
1426 start + offset + len(block) - 1),
1427 source_version=source_version,
1429 headers.append(dict(r.headers))
1430 offset += len(block)
1436 self, src_container, src_object, dst_container,
1438 source_version=None,
1439 source_account=None,
1444 :param src_container: (str) source container
1446 :param src_object: (str) source object path
1448 :param dst_container: (str) destination container
1450 :param dst_object: (str) destination object path
1452 :param source_version: (str) source object version
1454 :param source_account: (str) account to copy from
1456 :param public: (bool)
1458 :param content_type: (str)
1460 :param delimiter: (str)
1462 :returns: (dict) response headers
1464 self._assert_account()
1465 self.container = dst_container
1466 src_path = path4url(src_container, src_object)
1467 r = self.object_put(
1468 dst_object or src_object,
1472 source_version=source_version,
1473 source_account=source_account,
1475 content_type=content_type,
1476 delimiter=delimiter)
1480 self, src_container, src_object, dst_container,
1482 source_account=None,
1483 source_version=None,
1488 :param src_container: (str) source container
1490 :param src_object: (str) source object path
1492 :param dst_container: (str) destination container
1494 :param dst_object: (str) destination object path
1496 :param source_account: (str) account to move from
1498 :param source_version: (str) source object version
1500 :param public: (bool)
1502 :param content_type: (str)
1504 :param delimiter: (str)
1506 :returns: (dict) response headers
1508 self._assert_account()
1509 self.container = dst_container
1510 dst_object = dst_object or src_object
1511 src_path = path4url(src_container, src_object)
1512 r = self.object_put(
1517 source_account=source_account,
1518 source_version=source_version,
1520 content_type=content_type,
1521 delimiter=delimiter)
1524 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1525 """Get accounts that share with self.account
1529 :param marker: (str)
1533 self._assert_account()
1535 self.set_param('format', 'json')
1536 self.set_param('limit', limit, iff=limit is not None)
1537 self.set_param('marker', marker, iff=marker is not None)
1540 success = kwargs.pop('success', (200, 204))
1541 r = self.get(path, *args, success=success, **kwargs)
1544 def get_object_versionlist(self, obj):
1546 :param obj: (str) remote object path
1550 self._assert_container()
1551 r = self.object_get(obj, format='json', version='list')
1552 return r.json['versions']