1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, max_value, a_range):
57 :param start: (int) the window bottom
59 :param end: (int) the window top
61 :param max_value: (int) maximum accepted value
63 :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64 where X: x|x-y|-x where x < y and x, y natural numbers
66 :returns: (str) a range string cut-off for the start-end range
67 an empty response means this window is out of range
69 assert start >= 0, '_range_up called w. start(%s) < 0' % start
70 assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
72 assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
75 return '%s-%s' % (start, end)
77 for some_range in a_range.split(','):
78 v0, sep, v1 = some_range.partition('-')
83 if v1 < start or v0 > end or v1 < v0:
85 v0 = v0 if v0 > start else start
86 v1 = v1 if v1 < end else end
87 selected.append('%s-%s' % (v0, v1))
91 v1 = v0 if v0 <= end else end
92 selected.append('%s-%s' % (start, v1))
95 if max_value - v1 > end:
97 v0 = (max_value - v1) if max_value - v1 > start else start
98 selected.append('%s-%s' % (v0, end))
99 return ','.join(selected)
102 class PithosClient(PithosRestClient):
103 """Synnefo Pithos+ API client"""
105 def __init__(self, base_url, token, account=None, container=None):
106 super(PithosClient, self).__init__(base_url, token, account, container)
108 def create_container(
110 container=None, sizelimit=None, versioning=None, metadata=None,
113 :param container: (str) if not given, self.container is used instead
115 :param sizelimit: (int) container total size limit in bytes
117 :param versioning: (str) can be auto or whatever supported by server
119 :param metadata: (dict) Custom user-defined metadata of the form
120 { 'name1': 'value1', 'name2': 'value2', ... }
122 :returns: (dict) response headers
124 cnt_back_up = self.container
126 self.container = container or cnt_back_up
127 r = self.container_put(
128 quota=sizelimit, versioning=versioning, metadata=metadata,
132 self.container = cnt_back_up
134 def purge_container(self, container=None):
135 """Delete an empty container and destroy associated blocks
137 cnt_back_up = self.container
139 self.container = container or cnt_back_up
140 r = self.container_delete(until=unicode(time()))
142 self.container = cnt_back_up
145 def upload_object_unchunked(
150 content_encoding=None,
151 content_disposition=None,
156 :param obj: (str) remote object path
158 :param f: open file descriptor
160 :param withHashFile: (bool)
162 :param size: (int) size of data to upload
166 :param content_encoding: (str)
168 :param content_disposition: (str)
170 :param content_type: (str)
172 :param sharing: {'read':[user and/or grp names],
173 'write':[usr and/or grp names]}
175 :param public: (bool)
177 :returns: (dict) created object metadata
179 self._assert_container()
185 data = json.dumps(json.loads(data))
187 raise ClientError('"%s" is not json-formated' % f.name, 1)
189 msg = '"%s" is not a valid hashmap file' % f.name
190 raise ClientError(msg, 1)
193 data = readall(f, size) if size else f.read()
198 content_encoding=content_encoding,
199 content_disposition=content_disposition,
200 content_type=content_type,
206 def create_object_by_manifestation(
209 content_encoding=None,
210 content_disposition=None,
215 :param obj: (str) remote object path
219 :param content_encoding: (str)
221 :param content_disposition: (str)
223 :param content_type: (str)
225 :param sharing: {'read':[user and/or grp names],
226 'write':[usr and/or grp names]}
228 :param public: (bool)
230 :returns: (dict) created object metadata
232 self._assert_container()
237 content_encoding=content_encoding,
238 content_disposition=content_disposition,
239 content_type=content_type,
242 manifest='%s/%s' % (self.container, obj))
245 # upload_* auxiliary methods
246 def _put_block_async(self, data, hash):
247 event = SilentEvent(method=self._put_block, data=data, hash=hash)
251 def _put_block(self, data, hash):
252 r = self.container_post(
254 content_type='application/octet-stream',
255 content_length=len(data),
258 assert r.json[0] == hash, 'Local hash does not match server'
260 def _get_file_block_info(self, fileobj, size=None, cache=None):
262 :param fileobj: (file descriptor) source
264 :param size: (int) size of data to upload from source
266 :param cache: (dict) if provided, cache container info response to
267 avoid redundant calls
269 if isinstance(cache, dict):
271 meta = cache[self.container]
273 meta = self.get_container_info()
274 cache[self.container] = meta
276 meta = self.get_container_info()
277 blocksize = int(meta['x-container-block-size'])
278 blockhash = meta['x-container-block-hash']
279 size = size if size is not None else fstat(fileobj.fileno()).st_size
280 nblocks = 1 + (size - 1) // blocksize
281 return (blocksize, blockhash, size, nblocks)
283 def _create_object_or_get_missing_hashes(
290 if_etag_not_match=None,
291 content_encoding=None,
292 content_disposition=None,
300 content_type=content_type,
302 if_etag_match=if_etag_match,
303 if_etag_not_match=if_etag_not_match,
304 content_encoding=content_encoding,
305 content_disposition=content_disposition,
306 permissions=permissions,
309 return (None if r.status_code == 201 else r.json), r.headers
311 def _calculate_blocks_for_upload(
312 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
316 hash_gen = hash_cb(nblocks)
319 for i in range(nblocks):
320 block = readall(fileobj, min(blocksize, size - offset))
322 hash = _pithos_hash(block, blockhash)
324 hmap[hash] = (offset, bytes)
328 msg = ('Failed to calculate uploaded blocks:'
329 ' Offset and object size do not match')
330 assert offset == size, msg
332 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
333 """upload missing blocks asynchronously"""
335 self._init_thread_limit()
340 offset, bytes = hmap[hash]
342 data = readall(fileobj, bytes)
343 r = self._put_block_async(data, hash)
345 unfinished = self._watch_thread_limit(flying)
346 for thread in set(flying).difference(unfinished):
348 failures.append(thread)
351 ClientError) and thread.exception.status == 502:
352 self.POOLSIZE = self._thread_limit
353 elif thread.isAlive():
354 flying.append(thread)
362 for thread in flying:
365 failures.append(thread)
372 return [failure.kwargs['hash'] for failure in failures]
382 content_encoding=None,
383 content_disposition=None,
387 container_info_cache=None):
388 """Upload an object using multiple connections (threads)
390 :param obj: (str) remote object path
392 :param f: open file descriptor (rb)
394 :param hash_cb: optional progress.bar object for calculating hashes
396 :param upload_cb: optional progress.bar object for uploading
400 :param if_etag_match: (str) Push that value to if-match header at file
403 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
404 it does not exist remotely, otherwise the operation will fail.
405 Involves the case of an object with the same path is created while
406 the object is being uploaded.
408 :param content_encoding: (str)
410 :param content_disposition: (str)
412 :param content_type: (str)
414 :param sharing: {'read':[user and/or grp names],
415 'write':[usr and/or grp names]}
417 :param public: (bool)
419 :param container_info_cache: (dict) if given, avoid redundant calls to
420 server for container info (block size and hash information)
422 self._assert_container()
425 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
426 f, size, container_info_cache)
427 (hashes, hmap, offset) = ([], {}, 0)
429 content_type = 'application/octet-stream'
431 self._calculate_blocks_for_upload(
438 hashmap = dict(bytes=size, hashes=hashes)
439 missing, obj_headers = self._create_object_or_get_missing_hashes(
441 content_type=content_type,
443 if_etag_match=if_etag_match,
444 if_etag_not_match='*' if if_not_exist else None,
445 content_encoding=content_encoding,
446 content_disposition=content_disposition,
454 upload_gen = upload_cb(len(missing))
455 for i in range(len(missing), len(hashmap['hashes']) + 1):
466 sendlog.info('%s blocks missing' % len(missing))
467 num_of_blocks = len(missing)
468 missing = self._upload_missing_blocks(
474 if num_of_blocks == len(missing):
477 num_of_blocks = len(missing)
482 details = ['%s' % thread.exception for thread in missing]
484 details = ['Also, failed to read thread exceptions']
486 '%s blocks failed to upload' % len(missing),
488 except KeyboardInterrupt:
489 sendlog.info('- - - wait for threads to finish')
490 for thread in activethreads():
498 content_type=content_type,
499 content_encoding=content_encoding,
500 if_etag_match=if_etag_match,
501 if_etag_not_match='*' if if_not_exist else None,
509 def upload_from_string(
510 self, obj, input_str,
516 content_encoding=None,
517 content_disposition=None,
521 container_info_cache=None):
522 """Upload an object using multiple connections (threads)
524 :param obj: (str) remote object path
526 :param input_str: (str) upload content
528 :param hash_cb: optional progress.bar object for calculating hashes
530 :param upload_cb: optional progress.bar object for uploading
534 :param if_etag_match: (str) Push that value to if-match header at file
537 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
538 it does not exist remotely, otherwise the operation will fail.
539 Involves the case of an object with the same path is created while
540 the object is being uploaded.
542 :param content_encoding: (str)
544 :param content_disposition: (str)
546 :param content_type: (str)
548 :param sharing: {'read':[user and/or grp names],
549 'write':[usr and/or grp names]}
551 :param public: (bool)
553 :param container_info_cache: (dict) if given, avoid redundant calls to
554 server for container info (block size and hash information)
556 self._assert_container()
558 blocksize, blockhash, size, nblocks = self._get_file_block_info(
559 fileobj=None, size=len(input_str), cache=container_info_cache)
560 (hashes, hmap, offset) = ([], {}, 0)
562 content_type = 'application/octet-stream'
566 for blockid in range(nblocks):
567 start = blockid * blocksize
568 block = input_str[start: (start + blocksize)]
569 hashes.append(_pithos_hash(block, blockhash))
570 hmap[hashes[blockid]] = (start, block)
572 hashmap = dict(bytes=size, hashes=hashes)
573 missing, obj_headers = self._create_object_or_get_missing_hashes(
575 content_type=content_type,
577 if_etag_match=if_etag_match,
578 if_etag_not_match='*' if if_not_exist else None,
579 content_encoding=content_encoding,
580 content_disposition=content_disposition,
585 num_of_missing = len(missing)
588 self.progress_bar_gen = upload_cb(nblocks)
589 for i in range(nblocks + 1 - num_of_missing):
595 while tries and missing:
599 offset, block = hmap[hash]
600 bird = self._put_block_async(block, hash)
602 unfinished = self._watch_thread_limit(flying)
603 for thread in set(flying).difference(unfinished):
605 failures.append(thread.kwargs['hash'])
607 flying.append(thread)
611 for thread in flying:
614 failures.append(thread.kwargs['hash'])
617 if missing and len(missing) == old_failures:
619 old_failures = len(missing)
622 '%s blocks failed to upload' % len(missing),
623 details=['%s' % thread.exception for thread in missing])
624 except KeyboardInterrupt:
625 sendlog.info('- - - wait for threads to finish')
626 for thread in activethreads():
634 content_type=content_type,
635 content_encoding=content_encoding,
636 if_etag_match=if_etag_match,
637 if_etag_not_match='*' if if_not_exist else None,
645 # download_* auxiliary methods
646 def _get_remote_blocks_info(self, obj, **restargs):
647 #retrieve object hashmap
648 myrange = restargs.pop('data_range', None)
649 hashmap = self.get_object_hashmap(obj, **restargs)
650 restargs['data_range'] = myrange
651 blocksize = int(hashmap['block_size'])
652 blockhash = hashmap['block_hash']
653 total_size = hashmap['bytes']
654 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
656 for i, h in enumerate(hashmap['hashes']):
657 # map_dict[h] = i CHAGE
659 map_dict[h].append(i)
662 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
664 def _dump_blocks_sync(
665 self, obj, remote_hashes, blocksize, total_size, dst, crange,
669 for blockid, blockhash in enumerate(remote_hashes):
671 start = blocksize * blockid
672 is_last = start + blocksize > total_size
673 end = (total_size - 1) if is_last else (start + blocksize - 1)
674 data_range = _range_up(start, end, total_size, crange)
678 args['data_range'] = 'bytes=%s' % data_range
679 r = self.object_get(obj, success=(200, 206), **args)
684 def _get_block_async(self, obj, **args):
685 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
689 def _hash_from_file(self, fp, start, size, blockhash):
691 block = readall(fp, size)
692 h = newhashlib(blockhash)
693 h.update(block.strip('\x00'))
694 return hexlify(h.digest())
696 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
697 """write the results of a greenleted rest call to a file
699 :param offset: the offset of the file up to blocksize
700 - e.g. if the range is 10-100, all blocks will be written to
703 for key, g in flying.items():
708 block = g.value.content
709 for block_start in blockids[key]:
710 local_file.seek(block_start + offset)
711 local_file.write(block)
717 def _dump_blocks_async(
718 self, obj, remote_hashes, blocksize, total_size, local_file,
719 blockhash=None, resume=False, filerange=None, **restargs):
720 file_size = fstat(local_file.fileno()).st_size if resume else 0
722 blockid_dict = dict()
725 self._init_thread_limit()
726 for block_hash, blockids in remote_hashes.items():
727 blockids = [blk * blocksize for blk in blockids]
728 unsaved = [blk for blk in blockids if not (
729 blk < file_size and block_hash == self._hash_from_file(
730 local_file, blk, blocksize, blockhash))]
731 self._cb_next(len(blockids) - len(unsaved))
735 self._watch_thread_limit(flying.values())
737 flying, blockid_dict, local_file, offset,
739 end = total_size - 1 if (
740 key + blocksize > total_size) else key + blocksize - 1
741 data_range = _range_up(key, end, total_size, filerange)
746 'async_headers'] = {'Range': 'bytes=%s' % data_range}
747 flying[key] = self._get_block_async(obj, **restargs)
748 blockid_dict[key] = unsaved
750 for thread in flying.values():
752 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
762 if_modified_since=None,
763 if_unmodified_since=None):
764 """Download an object (multiple connections, random blocks)
766 :param obj: (str) remote object path
768 :param dst: open file descriptor (wb+)
770 :param download_cb: optional progress.bar object for downloading
772 :param version: (str) file version
774 :param resume: (bool) if set, preserve already downloaded file parts
776 :param range_str: (str) from, to are file positions (int) in bytes
778 :param if_match: (str)
780 :param if_none_match: (str)
782 :param if_modified_since: (str) formated date
784 :param if_unmodified_since: (str) formated date"""
787 data_range=None if range_str is None else 'bytes=%s' % range_str,
789 if_none_match=if_none_match,
790 if_modified_since=if_modified_since,
791 if_unmodified_since=if_unmodified_since)
798 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
799 assert total_size >= 0
802 self.progress_bar_gen = download_cb(len(hash_list))
806 self._dump_blocks_sync(
815 self._dump_blocks_async(
826 dst.truncate(total_size)
830 def download_to_string(
837 if_modified_since=None,
838 if_unmodified_since=None):
839 """Download an object to a string (multiple connections). This method
840 uses threads for http requests, but stores all content in memory.
842 :param obj: (str) remote object path
844 :param download_cb: optional progress.bar object for downloading
846 :param version: (str) file version
848 :param range_str: (str) from, to are file positions (int) in bytes
850 :param if_match: (str)
852 :param if_none_match: (str)
854 :param if_modified_since: (str) formated date
856 :param if_unmodified_since: (str) formated date
858 :returns: (str) the whole object contents
862 data_range=None if range_str is None else 'bytes=%s' % range_str,
864 if_none_match=if_none_match,
865 if_modified_since=if_modified_since,
866 if_unmodified_since=if_unmodified_since)
873 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
874 assert total_size >= 0
877 self.progress_bar_gen = download_cb(len(hash_list))
880 num_of_blocks = len(remote_hashes)
881 ret = [''] * num_of_blocks
882 self._init_thread_limit()
885 for blockid, blockhash in enumerate(remote_hashes):
886 start = blocksize * blockid
887 is_last = start + blocksize > total_size
888 end = (total_size - 1) if is_last else (start + blocksize - 1)
889 data_range_str = _range_up(start, end, end, range_str)
891 self._watch_thread_limit(flying.values())
892 restargs['data_range'] = 'bytes=%s' % data_range_str
893 flying[blockid] = self._get_block_async(obj, **restargs)
894 for runid, thread in flying.items():
895 if (blockid + 1) == num_of_blocks:
897 elif thread.isAlive():
900 raise thread.exception
901 ret[runid] = thread.value.content
905 except KeyboardInterrupt:
906 sendlog.info('- - - wait for threads to finish')
907 for thread in activethreads():
910 #Command Progress Bar method
911 def _cb_next(self, step=1):
912 if hasattr(self, 'progress_bar_gen'):
914 for i in xrange(step):
915 self.progress_bar_gen.next()
919 def _complete_cb(self):
922 self.progress_bar_gen.next()
926 def get_object_hashmap(
931 if_modified_since=None,
932 if_unmodified_since=None):
934 :param obj: (str) remote object path
936 :param if_match: (str)
938 :param if_none_match: (str)
940 :param if_modified_since: (str) formated date
942 :param if_unmodified_since: (str) formated date
951 if_etag_match=if_match,
952 if_etag_not_match=if_none_match,
953 if_modified_since=if_modified_since,
954 if_unmodified_since=if_unmodified_since)
955 except ClientError as err:
956 if err.status == 304 or err.status == 412:
961 def set_account_group(self, group, usernames):
965 :param usernames: (list)
967 r = self.account_post(update=True, groups={group: usernames})
970 def del_account_group(self, group):
974 self.account_post(update=True, groups={group: []})
976 def get_account_info(self, until=None):
978 :param until: (str) formated date
982 r = self.account_head(until=until)
983 if r.status_code == 401:
984 raise ClientError("No authorization", status=401)
987 def get_account_quota(self):
992 self.get_account_info(),
993 'X-Account-Policy-Quota',
996 #def get_account_versioning(self):
1001 # self.get_account_info(),
1002 # 'X-Account-Policy-Versioning',
1005 def get_account_meta(self, until=None):
1007 :param until: (str) formated date
1011 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1013 def get_account_group(self):
1017 return filter_in(self.get_account_info(), 'X-Account-Group-')
1019 def set_account_meta(self, metapairs):
1021 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1023 assert(type(metapairs) is dict)
1024 r = self.account_post(update=True, metadata=metapairs)
1027 def del_account_meta(self, metakey):
1029 :param metakey: (str) metadatum key
1031 r = self.account_post(update=True, metadata={metakey: ''})
1034 #def set_account_quota(self, quota):
1036 # :param quota: (int)
1038 # self.account_post(update=True, quota=quota)
1040 #def set_account_versioning(self, versioning):
1042 # :param versioning: (str)
1044 # r = self.account_post(update=True, versioning=versioning)
1047 def list_containers(self):
1051 r = self.account_get()
1054 def del_container(self, until=None, delimiter=None):
1056 :param until: (str) formated date
1058 :param delimiter: (str) with / empty container
1060 :raises ClientError: 404 Container does not exist
1062 :raises ClientError: 409 Container is not empty
1064 self._assert_container()
1065 r = self.container_delete(
1067 delimiter=delimiter,
1068 success=(204, 404, 409))
1069 if r.status_code == 404:
1071 'Container "%s" does not exist' % self.container,
1073 elif r.status_code == 409:
1075 'Container "%s" is not empty' % self.container,
1079 def get_container_versioning(self, container=None):
1081 :param container: (str)
1085 cnt_back_up = self.container
1087 self.container = container or cnt_back_up
1089 self.get_container_info(),
1090 'X-Container-Policy-Versioning')
1092 self.container = cnt_back_up
1094 def get_container_limit(self, container=None):
1096 :param container: (str)
1100 cnt_back_up = self.container
1102 self.container = container or cnt_back_up
1104 self.get_container_info(),
1105 'X-Container-Policy-Quota')
1107 self.container = cnt_back_up
1109 def get_container_info(self, until=None):
1111 :param until: (str) formated date
1115 :raises ClientError: 404 Container not found
1118 r = self.container_head(until=until)
1119 except ClientError as err:
1120 err.details.append('for container %s' % self.container)
1124 def get_container_meta(self, until=None):
1126 :param until: (str) formated date
1131 self.get_container_info(until=until),
1134 def get_container_object_meta(self, until=None):
1136 :param until: (str) formated date
1141 self.get_container_info(until=until),
1142 'X-Container-Object-Meta')
1144 def set_container_meta(self, metapairs):
1146 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1148 assert(type(metapairs) is dict)
1149 r = self.container_post(update=True, metadata=metapairs)
1152 def del_container_meta(self, metakey):
1154 :param metakey: (str) metadatum key
1156 :returns: (dict) response headers
1158 r = self.container_post(update=True, metadata={metakey: ''})
1161 def set_container_limit(self, limit):
1165 r = self.container_post(update=True, quota=limit)
1168 def set_container_versioning(self, versioning):
1170 :param versioning: (str)
1172 r = self.container_post(update=True, versioning=versioning)
1175 def del_object(self, obj, until=None, delimiter=None):
1177 :param obj: (str) remote object path
1179 :param until: (str) formated date
1181 :param delimiter: (str)
1183 self._assert_container()
1184 r = self.object_delete(obj, until=until, delimiter=delimiter)
1187 def set_object_meta(self, obj, metapairs):
1189 :param obj: (str) remote object path
1191 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1193 assert(type(metapairs) is dict)
1194 r = self.object_post(obj, update=True, metadata=metapairs)
1197 def del_object_meta(self, obj, metakey):
1199 :param obj: (str) remote object path
1201 :param metakey: (str) metadatum key
1203 r = self.object_post(obj, update=True, metadata={metakey: ''})
1206 def publish_object(self, obj):
1208 :param obj: (str) remote object path
1210 :returns: (str) access url
1212 self.object_post(obj, update=True, public=True)
1213 info = self.get_object_info(obj)
1214 return info['x-object-public']
1215 pref, sep, rest = self.base_url.partition('//')
1216 base = rest.split('/')[0]
1217 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1219 def unpublish_object(self, obj):
1221 :param obj: (str) remote object path
1223 r = self.object_post(obj, update=True, public=False)
1226 def get_object_info(self, obj, version=None):
1228 :param obj: (str) remote object path
1230 :param version: (str)
1235 r = self.object_head(obj, version=version)
1237 except ClientError as ce:
1238 if ce.status == 404:
1239 raise ClientError('Object %s not found' % obj, status=404)
1242 def get_object_meta(self, obj, version=None):
1244 :param obj: (str) remote object path
1246 :param version: (str)
1251 self.get_object_info(obj, version=version),
1254 def get_object_sharing(self, obj):
1256 :param obj: (str) remote object path
1261 self.get_object_info(obj),
1266 perms = r['x-object-sharing'].split(';')
1271 raise ClientError('Incorrect reply format')
1272 (key, val) = perm.strip().split('=')
1276 def set_object_sharing(
1278 read_permission=False, write_permission=False):
1279 """Give read/write permisions to an object.
1281 :param obj: (str) remote object path
1283 :param read_permission: (list - bool) users and user groups that get
1284 read permission for this object - False means all previous read
1285 permissions will be removed
1287 :param write_permission: (list - bool) of users and user groups to get
1288 write permission for this object - False means all previous write
1289 permissions will be removed
1291 :returns: (dict) response headers
1294 perms = dict(read=read_permission or '', write=write_permission or '')
1295 r = self.object_post(obj, update=True, permissions=perms)
1298 def del_object_sharing(self, obj):
1300 :param obj: (str) remote object path
1302 return self.set_object_sharing(obj)
1304 def append_object(self, obj, source_file, upload_cb=None):
1306 :param obj: (str) remote object path
1308 :param source_file: open file descriptor
1310 :param upload_db: progress.bar for uploading
1312 self._assert_container()
1313 meta = self.get_container_info()
1314 blocksize = int(meta['x-container-block-size'])
1315 filesize = fstat(source_file.fileno()).st_size
1316 nblocks = 1 + (filesize - 1) // blocksize
1320 self.progress_bar_gen = upload_cb(nblocks)
1323 self._init_thread_limit()
1325 for i in range(nblocks):
1326 block = source_file.read(min(blocksize, filesize - offset))
1327 offset += len(block)
1329 self._watch_thread_limit(flying.values())
1331 flying[i] = SilentEvent(
1332 method=self.object_post,
1335 content_range='bytes */*',
1336 content_type='application/octet-stream',
1337 content_length=len(block),
1341 for key, thread in flying.items():
1342 if thread.isAlive():
1344 unfinished[key] = thread
1347 if thread.exception:
1348 raise thread.exception
1349 headers[key] = thread.value.headers
1352 except KeyboardInterrupt:
1353 sendlog.info('- - - wait for threads to finish')
1354 for thread in activethreads():
1357 from time import sleep
1358 sleep(2 * len(activethreads()))
1359 return headers.values()
1361 def truncate_object(self, obj, upto_bytes):
1363 :param obj: (str) remote object path
1365 :param upto_bytes: max number of bytes to leave on file
1367 :returns: (dict) response headers
1369 r = self.object_post(
1372 content_range='bytes 0-%s/*' % upto_bytes,
1373 content_type='application/octet-stream',
1374 object_bytes=upto_bytes,
1375 source_object=path4url(self.container, obj))
1378 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1379 """Overwrite a part of an object from local source file
1381 :param obj: (str) remote object path
1383 :param start: (int) position in bytes to start overwriting from
1385 :param end: (int) position in bytes to stop overwriting at
1387 :param source_file: open file descriptor
1389 :param upload_db: progress.bar for uploading
1392 r = self.get_object_info(obj)
1393 rf_size = int(r['content-length'])
1394 if rf_size < int(start):
1396 'Range start exceeds file size',
1398 elif rf_size < int(end):
1400 'Range end exceeds file size',
1402 self._assert_container()
1403 meta = self.get_container_info()
1404 blocksize = int(meta['x-container-block-size'])
1405 filesize = fstat(source_file.fileno()).st_size
1406 datasize = int(end) - int(start) + 1
1407 nblocks = 1 + (datasize - 1) // blocksize
1410 self.progress_bar_gen = upload_cb(nblocks)
1413 for i in range(nblocks):
1414 read_size = min(blocksize, filesize - offset, datasize - offset)
1415 block = source_file.read(read_size)
1416 r = self.object_post(
1419 content_type='application/octet-stream',
1420 content_length=len(block),
1421 content_range='bytes %s-%s/*' % (
1423 start + offset + len(block) - 1),
1425 headers.append(dict(r.headers))
1426 offset += len(block)
1432 self, src_container, src_object, dst_container,
1434 source_version=None,
1435 source_account=None,
1440 :param src_container: (str) source container
1442 :param src_object: (str) source object path
1444 :param dst_container: (str) destination container
1446 :param dst_object: (str) destination object path
1448 :param source_version: (str) source object version
1450 :param source_account: (str) account to copy from
1452 :param public: (bool)
1454 :param content_type: (str)
1456 :param delimiter: (str)
1458 :returns: (dict) response headers
1460 self._assert_account()
1461 self.container = dst_container
1462 src_path = path4url(src_container, src_object)
1463 r = self.object_put(
1464 dst_object or src_object,
1468 source_version=source_version,
1469 source_account=source_account,
1471 content_type=content_type,
1472 delimiter=delimiter)
1476 self, src_container, src_object, dst_container,
1478 source_account=None,
1479 source_version=None,
1484 :param src_container: (str) source container
1486 :param src_object: (str) source object path
1488 :param dst_container: (str) destination container
1490 :param dst_object: (str) destination object path
1492 :param source_account: (str) account to move from
1494 :param source_version: (str) source object version
1496 :param public: (bool)
1498 :param content_type: (str)
1500 :param delimiter: (str)
1502 :returns: (dict) response headers
1504 self._assert_account()
1505 self.container = dst_container
1506 dst_object = dst_object or src_object
1507 src_path = path4url(src_container, src_object)
1508 r = self.object_put(
1513 source_account=source_account,
1514 source_version=source_version,
1516 content_type=content_type,
1517 delimiter=delimiter)
1520 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1521 """Get accounts that share with self.account
1525 :param marker: (str)
1529 self._assert_account()
1531 self.set_param('format', 'json')
1532 self.set_param('limit', limit, iff=limit is not None)
1533 self.set_param('marker', marker, iff=marker is not None)
1536 success = kwargs.pop('success', (200, 204))
1537 r = self.get(path, *args, success=success, **kwargs)
1540 def get_object_versionlist(self, obj):
1542 :param obj: (str) remote object path
1546 self._assert_container()
1547 r = self.object_get(obj, format='json', version='list')
1548 return r.json['versions']