1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, max_value, a_range):
57 :param start: (int) the window bottom
59 :param end: (int) the window top
61 :param max_value: (int) maximum accepted value
63 :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64 where X: x|x-y|-x where x < y and x, y natural numbers
66 :returns: (str) a range string cut-off for the start-end range
67 an empty response means this window is out of range
69 assert start >= 0, '_range_up was called with start < 0'
70 assert end >= start, '_range_up was called with end < start'
71 assert end <= max_value, '_range_up was called with max_value < end'
73 return '%s-%s' % (start, end)
75 for some_range in a_range.split(','):
76 v0, sep, v1 = some_range.partition('-')
81 if v1 < start or v0 > end or v1 < v0:
83 v0 = v0 if v0 > start else start
84 v1 = v1 if v1 < end else end
85 selected.append('%s-%s' % (v0, v1))
89 v1 = v0 if v0 <= end else end
90 selected.append('%s-%s' % (start, v1))
93 if max_value - v1 > end:
95 v0 = (max_value - v1) if max_value - v1 > start else start
96 selected.append('%s-%s' % (v0, end))
97 return ','.join(selected)
100 class PithosClient(PithosRestClient):
101 """Synnefo Pithos+ API client"""
103 def __init__(self, base_url, token, account=None, container=None):
104 super(PithosClient, self).__init__(base_url, token, account, container)
106 def create_container(
108 container=None, sizelimit=None, versioning=None, metadata=None,
111 :param container: (str) if not given, self.container is used instead
113 :param sizelimit: (int) container total size limit in bytes
115 :param versioning: (str) can be auto or whatever supported by server
117 :param metadata: (dict) Custom user-defined metadata of the form
118 { 'name1': 'value1', 'name2': 'value2', ... }
120 :returns: (dict) response headers
122 cnt_back_up = self.container
124 self.container = container or cnt_back_up
125 r = self.container_put(
126 quota=sizelimit, versioning=versioning, metadata=metadata,
130 self.container = cnt_back_up
132 def purge_container(self, container=None):
133 """Delete an empty container and destroy associated blocks
135 cnt_back_up = self.container
137 self.container = container or cnt_back_up
138 r = self.container_delete(until=unicode(time()))
140 self.container = cnt_back_up
143 def upload_object_unchunked(
148 content_encoding=None,
149 content_disposition=None,
154 :param obj: (str) remote object path
156 :param f: open file descriptor
158 :param withHashFile: (bool)
160 :param size: (int) size of data to upload
164 :param content_encoding: (str)
166 :param content_disposition: (str)
168 :param content_type: (str)
170 :param sharing: {'read':[user and/or grp names],
171 'write':[usr and/or grp names]}
173 :param public: (bool)
175 :returns: (dict) created object metadata
177 self._assert_container()
183 data = json.dumps(json.loads(data))
185 raise ClientError('"%s" is not json-formated' % f.name, 1)
187 msg = '"%s" is not a valid hashmap file' % f.name
188 raise ClientError(msg, 1)
191 data = f.read(size) if size else f.read()
196 content_encoding=content_encoding,
197 content_disposition=content_disposition,
198 content_type=content_type,
204 def create_object_by_manifestation(
207 content_encoding=None,
208 content_disposition=None,
213 :param obj: (str) remote object path
217 :param content_encoding: (str)
219 :param content_disposition: (str)
221 :param content_type: (str)
223 :param sharing: {'read':[user and/or grp names],
224 'write':[usr and/or grp names]}
226 :param public: (bool)
228 :returns: (dict) created object metadata
230 self._assert_container()
235 content_encoding=content_encoding,
236 content_disposition=content_disposition,
237 content_type=content_type,
240 manifest='%s/%s' % (self.container, obj))
243 # upload_* auxiliary methods
244 def _put_block_async(self, data, hash):
245 event = SilentEvent(method=self._put_block, data=data, hash=hash)
249 def _put_block(self, data, hash):
250 r = self.container_post(
252 content_type='application/octet-stream',
253 content_length=len(data),
256 assert r.json[0] == hash, 'Local hash does not match server'
258 def _get_file_block_info(self, fileobj, size=None, cache=None):
260 :param fileobj: (file descriptor) source
262 :param size: (int) size of data to upload from source
264 :param cache: (dict) if provided, cache container info response to
265 avoid redundant calls
267 if isinstance(cache, dict):
269 meta = cache[self.container]
271 meta = self.get_container_info()
272 cache[self.container] = meta
274 meta = self.get_container_info()
275 blocksize = int(meta['x-container-block-size'])
276 blockhash = meta['x-container-block-hash']
277 size = size if size is not None else fstat(fileobj.fileno()).st_size
278 nblocks = 1 + (size - 1) // blocksize
279 return (blocksize, blockhash, size, nblocks)
281 def _create_object_or_get_missing_hashes(
288 if_etag_not_match=None,
289 content_encoding=None,
290 content_disposition=None,
298 content_type=content_type,
300 if_etag_match=if_etag_match,
301 if_etag_not_match=if_etag_not_match,
302 content_encoding=content_encoding,
303 content_disposition=content_disposition,
304 permissions=permissions,
307 return (None if r.status_code == 201 else r.json), r.headers
309 def _calculate_blocks_for_upload(
310 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
314 hash_gen = hash_cb(nblocks)
317 for i in range(nblocks):
318 block = fileobj.read(min(blocksize, size - offset))
320 hash = _pithos_hash(block, blockhash)
322 hmap[hash] = (offset, bytes)
326 msg = ('Failed to calculate uploaded blocks:'
327 ' Offset and object size do not match')
328 assert offset == size, msg
330 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
331 """upload missing blocks asynchronously"""
333 self._init_thread_limit()
338 offset, bytes = hmap[hash]
340 data = fileobj.read(bytes)
341 r = self._put_block_async(data, hash)
343 unfinished = self._watch_thread_limit(flying)
344 for thread in set(flying).difference(unfinished):
346 failures.append(thread)
349 ClientError) and thread.exception.status == 502:
350 self.POOLSIZE = self._thread_limit
351 elif thread.isAlive():
352 flying.append(thread)
360 for thread in flying:
363 failures.append(thread)
370 return [failure.kwargs['hash'] for failure in failures]
380 content_encoding=None,
381 content_disposition=None,
385 container_info_cache=None):
386 """Upload an object using multiple connections (threads)
388 :param obj: (str) remote object path
390 :param f: open file descriptor (rb)
392 :param hash_cb: optional progress.bar object for calculating hashes
394 :param upload_cb: optional progress.bar object for uploading
398 :param if_etag_match: (str) Push that value to if-match header at file
401 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
402 it does not exist remotely, otherwise the operation will fail.
403 Involves the case of an object with the same path is created while
404 the object is being uploaded.
406 :param content_encoding: (str)
408 :param content_disposition: (str)
410 :param content_type: (str)
412 :param sharing: {'read':[user and/or grp names],
413 'write':[usr and/or grp names]}
415 :param public: (bool)
417 :param container_info_cache: (dict) if given, avoid redundant calls to
418 server for container info (block size and hash information)
420 self._assert_container()
423 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
424 f, size, container_info_cache)
425 (hashes, hmap, offset) = ([], {}, 0)
427 content_type = 'application/octet-stream'
429 self._calculate_blocks_for_upload(
436 hashmap = dict(bytes=size, hashes=hashes)
437 missing, obj_headers = self._create_object_or_get_missing_hashes(
439 content_type=content_type,
441 if_etag_match=if_etag_match,
442 if_etag_not_match='*' if if_not_exist else None,
443 content_encoding=content_encoding,
444 content_disposition=content_disposition,
452 upload_gen = upload_cb(len(missing))
453 for i in range(len(missing), len(hashmap['hashes']) + 1):
464 sendlog.info('%s blocks missing' % len(missing))
465 num_of_blocks = len(missing)
466 missing = self._upload_missing_blocks(
472 if num_of_blocks == len(missing):
475 num_of_blocks = len(missing)
480 details = ['%s' % thread.exception for thread in missing]
482 details = ['Also, failed to read thread exceptions']
484 '%s blocks failed to upload' % len(missing),
486 except KeyboardInterrupt:
487 sendlog.info('- - - wait for threads to finish')
488 for thread in activethreads():
496 content_type=content_type,
497 content_encoding=content_encoding,
498 if_etag_match=if_etag_match,
499 if_etag_not_match='*' if if_not_exist else None,
507 def upload_from_string(
508 self, obj, input_str,
514 content_encoding=None,
515 content_disposition=None,
519 container_info_cache=None):
520 """Upload an object using multiple connections (threads)
522 :param obj: (str) remote object path
524 :param input_str: (str) upload content
526 :param hash_cb: optional progress.bar object for calculating hashes
528 :param upload_cb: optional progress.bar object for uploading
532 :param if_etag_match: (str) Push that value to if-match header at file
535 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
536 it does not exist remotely, otherwise the operation will fail.
537 Involves the case of an object with the same path is created while
538 the object is being uploaded.
540 :param content_encoding: (str)
542 :param content_disposition: (str)
544 :param content_type: (str)
546 :param sharing: {'read':[user and/or grp names],
547 'write':[usr and/or grp names]}
549 :param public: (bool)
551 :param container_info_cache: (dict) if given, avoid redundant calls to
552 server for container info (block size and hash information)
554 self._assert_container()
556 blocksize, blockhash, size, nblocks = self._get_file_block_info(
557 fileobj=None, size=len(input_str), cache=container_info_cache)
558 (hashes, hmap, offset) = ([], {}, 0)
560 content_type = 'application/octet-stream'
564 for blockid in range(nblocks):
565 start = blockid * blocksize
566 block = input_str[start: (start + blocksize)]
567 hashes.append(_pithos_hash(block, blockhash))
568 hmap[hashes[blockid]] = (start, block)
570 hashmap = dict(bytes=size, hashes=hashes)
571 missing, obj_headers = self._create_object_or_get_missing_hashes(
573 content_type=content_type,
575 if_etag_match=if_etag_match,
576 if_etag_not_match='*' if if_not_exist else None,
577 content_encoding=content_encoding,
578 content_disposition=content_disposition,
583 num_of_missing = len(missing)
586 self.progress_bar_gen = upload_cb(nblocks)
587 for i in range(nblocks + 1 - num_of_missing):
593 while tries and missing:
597 offset, block = hmap[hash]
598 bird = self._put_block_async(block, hash)
600 unfinished = self._watch_thread_limit(flying)
601 for thread in set(flying).difference(unfinished):
603 failures.append(thread.kwargs['hash'])
605 flying.append(thread)
609 for thread in flying:
612 failures.append(thread.kwargs['hash'])
615 if missing and len(missing) == old_failures:
617 old_failures = len(missing)
620 '%s blocks failed to upload' % len(missing),
621 details=['%s' % thread.exception for thread in missing])
622 except KeyboardInterrupt:
623 sendlog.info('- - - wait for threads to finish')
624 for thread in activethreads():
632 content_type=content_type,
633 content_encoding=content_encoding,
634 if_etag_match=if_etag_match,
635 if_etag_not_match='*' if if_not_exist else None,
643 # download_* auxiliary methods
644 def _get_remote_blocks_info(self, obj, **restargs):
645 #retrieve object hashmap
646 myrange = restargs.pop('data_range', None)
647 hashmap = self.get_object_hashmap(obj, **restargs)
648 restargs['data_range'] = myrange
649 blocksize = int(hashmap['block_size'])
650 blockhash = hashmap['block_hash']
651 total_size = hashmap['bytes']
652 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
654 for i, h in enumerate(hashmap['hashes']):
655 # map_dict[h] = i CHAGE
657 map_dict[h].append(i)
660 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
662 def _dump_blocks_sync(
663 self, obj, remote_hashes, blocksize, total_size, dst, crange,
665 for blockid, blockhash in enumerate(remote_hashes):
667 start = blocksize * blockid
668 is_last = start + blocksize > total_size
669 end = (total_size - 1) if is_last else (start + blocksize - 1)
670 data_range = _range_up(start, end, total_size, crange)
674 args['data_range'] = 'bytes=%s' % data_range
675 r = self.object_get(obj, success=(200, 206), **args)
680 def _get_block_async(self, obj, **args):
681 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
685 def _hash_from_file(self, fp, start, size, blockhash):
687 block = fp.read(size)
688 h = newhashlib(blockhash)
689 h.update(block.strip('\x00'))
690 return hexlify(h.digest())
692 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
693 """write the results of a greenleted rest call to a file
695 :param offset: the offset of the file up to blocksize
696 - e.g. if the range is 10-100, all blocks will be written to
699 for key, g in flying.items():
704 block = g.value.content
705 for block_start in blockids[key]:
706 local_file.seek(block_start + offset)
707 local_file.write(block)
713 def _dump_blocks_async(
714 self, obj, remote_hashes, blocksize, total_size, local_file,
715 blockhash=None, resume=False, filerange=None, **restargs):
716 file_size = fstat(local_file.fileno()).st_size if resume else 0
718 blockid_dict = dict()
721 self._init_thread_limit()
722 for block_hash, blockids in remote_hashes.items():
723 blockids = [blk * blocksize for blk in blockids]
724 unsaved = [blk for blk in blockids if not (
725 blk < file_size and block_hash == self._hash_from_file(
726 local_file, blk, blocksize, blockhash))]
727 self._cb_next(len(blockids) - len(unsaved))
730 self._watch_thread_limit(flying.values())
732 flying, blockid_dict, local_file, offset,
734 end = total_size - 1 if (
735 key + blocksize > total_size) else key + blocksize - 1
736 data_range = _range_up(key, end, total_size, filerange)
740 restargs['async_headers'] = {'Range': 'bytes=%s' % data_range}
741 flying[key] = self._get_block_async(obj, **restargs)
742 blockid_dict[key] = unsaved
744 for thread in flying.values():
746 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
756 if_modified_since=None,
757 if_unmodified_since=None):
758 """Download an object (multiple connections, random blocks)
760 :param obj: (str) remote object path
762 :param dst: open file descriptor (wb+)
764 :param download_cb: optional progress.bar object for downloading
766 :param version: (str) file version
768 :param resume: (bool) if set, preserve already downloaded file parts
770 :param range_str: (str) from, to are file positions (int) in bytes
772 :param if_match: (str)
774 :param if_none_match: (str)
776 :param if_modified_since: (str) formated date
778 :param if_unmodified_since: (str) formated date"""
781 data_range=None if range_str is None else 'bytes=%s' % range_str,
783 if_none_match=if_none_match,
784 if_modified_since=if_modified_since,
785 if_unmodified_since=if_unmodified_since)
792 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
793 assert total_size >= 0
796 self.progress_bar_gen = download_cb(len(hash_list))
800 self._dump_blocks_sync(
809 self._dump_blocks_async(
820 dst.truncate(total_size)
824 def download_to_string(
831 if_modified_since=None,
832 if_unmodified_since=None):
833 """Download an object to a string (multiple connections). This method
834 uses threads for http requests, but stores all content in memory.
836 :param obj: (str) remote object path
838 :param download_cb: optional progress.bar object for downloading
840 :param version: (str) file version
842 :param range_str: (str) from, to are file positions (int) in bytes
844 :param if_match: (str)
846 :param if_none_match: (str)
848 :param if_modified_since: (str) formated date
850 :param if_unmodified_since: (str) formated date
852 :returns: (str) the whole object contents
856 data_range=None if range_str is None else 'bytes=%s' % range_str,
858 if_none_match=if_none_match,
859 if_modified_since=if_modified_since,
860 if_unmodified_since=if_unmodified_since)
867 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
868 assert total_size >= 0
871 self.progress_bar_gen = download_cb(len(hash_list))
874 num_of_blocks = len(remote_hashes)
875 ret = [''] * num_of_blocks
876 self._init_thread_limit()
879 for blockid, blockhash in enumerate(remote_hashes):
880 start = blocksize * blockid
881 is_last = start + blocksize > total_size
882 end = (total_size - 1) if is_last else (start + blocksize - 1)
883 data_range_str = _range_up(start, end, end, range_str)
885 self._watch_thread_limit(flying.values())
886 restargs['data_range'] = 'bytes=%s' % data_range_str
887 flying[blockid] = self._get_block_async(obj, **restargs)
888 for runid, thread in flying.items():
889 if (blockid + 1) == num_of_blocks:
891 elif thread.isAlive():
894 raise thread.exception
895 ret[runid] = thread.value.content
899 except KeyboardInterrupt:
900 sendlog.info('- - - wait for threads to finish')
901 for thread in activethreads():
904 #Command Progress Bar method
905 def _cb_next(self, step=1):
906 if hasattr(self, 'progress_bar_gen'):
908 for i in xrange(step):
909 self.progress_bar_gen.next()
913 def _complete_cb(self):
916 self.progress_bar_gen.next()
920 def get_object_hashmap(
925 if_modified_since=None,
926 if_unmodified_since=None):
928 :param obj: (str) remote object path
930 :param if_match: (str)
932 :param if_none_match: (str)
934 :param if_modified_since: (str) formated date
936 :param if_unmodified_since: (str) formated date
945 if_etag_match=if_match,
946 if_etag_not_match=if_none_match,
947 if_modified_since=if_modified_since,
948 if_unmodified_since=if_unmodified_since)
949 except ClientError as err:
950 if err.status == 304 or err.status == 412:
955 def set_account_group(self, group, usernames):
959 :param usernames: (list)
961 r = self.account_post(update=True, groups={group: usernames})
964 def del_account_group(self, group):
968 self.account_post(update=True, groups={group: []})
970 def get_account_info(self, until=None):
972 :param until: (str) formated date
976 r = self.account_head(until=until)
977 if r.status_code == 401:
978 raise ClientError("No authorization", status=401)
981 def get_account_quota(self):
986 self.get_account_info(),
987 'X-Account-Policy-Quota',
990 #def get_account_versioning(self):
995 # self.get_account_info(),
996 # 'X-Account-Policy-Versioning',
999 def get_account_meta(self, until=None):
1001 :param until: (str) formated date
1005 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1007 def get_account_group(self):
1011 return filter_in(self.get_account_info(), 'X-Account-Group-')
1013 def set_account_meta(self, metapairs):
1015 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1017 assert(type(metapairs) is dict)
1018 r = self.account_post(update=True, metadata=metapairs)
1021 def del_account_meta(self, metakey):
1023 :param metakey: (str) metadatum key
1025 r = self.account_post(update=True, metadata={metakey: ''})
1028 #def set_account_quota(self, quota):
1030 # :param quota: (int)
1032 # self.account_post(update=True, quota=quota)
1034 #def set_account_versioning(self, versioning):
1036 # :param versioning: (str)
1038 # r = self.account_post(update=True, versioning=versioning)
1041 def list_containers(self):
1045 r = self.account_get()
1048 def del_container(self, until=None, delimiter=None):
1050 :param until: (str) formated date
1052 :param delimiter: (str) with / empty container
1054 :raises ClientError: 404 Container does not exist
1056 :raises ClientError: 409 Container is not empty
1058 self._assert_container()
1059 r = self.container_delete(
1061 delimiter=delimiter,
1062 success=(204, 404, 409))
1063 if r.status_code == 404:
1065 'Container "%s" does not exist' % self.container,
1067 elif r.status_code == 409:
1069 'Container "%s" is not empty' % self.container,
1073 def get_container_versioning(self, container=None):
1075 :param container: (str)
1079 cnt_back_up = self.container
1081 self.container = container or cnt_back_up
1083 self.get_container_info(),
1084 'X-Container-Policy-Versioning')
1086 self.container = cnt_back_up
1088 def get_container_limit(self, container=None):
1090 :param container: (str)
1094 cnt_back_up = self.container
1096 self.container = container or cnt_back_up
1098 self.get_container_info(),
1099 'X-Container-Policy-Quota')
1101 self.container = cnt_back_up
1103 def get_container_info(self, until=None):
1105 :param until: (str) formated date
1109 :raises ClientError: 404 Container not found
1112 r = self.container_head(until=until)
1113 except ClientError as err:
1114 err.details.append('for container %s' % self.container)
1118 def get_container_meta(self, until=None):
1120 :param until: (str) formated date
1125 self.get_container_info(until=until),
1128 def get_container_object_meta(self, until=None):
1130 :param until: (str) formated date
1135 self.get_container_info(until=until),
1136 'X-Container-Object-Meta')
1138 def set_container_meta(self, metapairs):
1140 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1142 assert(type(metapairs) is dict)
1143 r = self.container_post(update=True, metadata=metapairs)
1146 def del_container_meta(self, metakey):
1148 :param metakey: (str) metadatum key
1150 :returns: (dict) response headers
1152 r = self.container_post(update=True, metadata={metakey: ''})
1155 def set_container_limit(self, limit):
1159 r = self.container_post(update=True, quota=limit)
1162 def set_container_versioning(self, versioning):
1164 :param versioning: (str)
1166 r = self.container_post(update=True, versioning=versioning)
1169 def del_object(self, obj, until=None, delimiter=None):
1171 :param obj: (str) remote object path
1173 :param until: (str) formated date
1175 :param delimiter: (str)
1177 self._assert_container()
1178 r = self.object_delete(obj, until=until, delimiter=delimiter)
1181 def set_object_meta(self, obj, metapairs):
1183 :param obj: (str) remote object path
1185 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1187 assert(type(metapairs) is dict)
1188 r = self.object_post(obj, update=True, metadata=metapairs)
1191 def del_object_meta(self, obj, metakey):
1193 :param obj: (str) remote object path
1195 :param metakey: (str) metadatum key
1197 r = self.object_post(obj, update=True, metadata={metakey: ''})
1200 def publish_object(self, obj):
1202 :param obj: (str) remote object path
1204 :returns: (str) access url
1206 self.object_post(obj, update=True, public=True)
1207 info = self.get_object_info(obj)
1208 return info['x-object-public']
1209 pref, sep, rest = self.base_url.partition('//')
1210 base = rest.split('/')[0]
1211 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1213 def unpublish_object(self, obj):
1215 :param obj: (str) remote object path
1217 r = self.object_post(obj, update=True, public=False)
1220 def get_object_info(self, obj, version=None):
1222 :param obj: (str) remote object path
1224 :param version: (str)
1229 r = self.object_head(obj, version=version)
1231 except ClientError as ce:
1232 if ce.status == 404:
1233 raise ClientError('Object %s not found' % obj, status=404)
1236 def get_object_meta(self, obj, version=None):
1238 :param obj: (str) remote object path
1240 :param version: (str)
1245 self.get_object_info(obj, version=version),
1248 def get_object_sharing(self, obj):
1250 :param obj: (str) remote object path
1255 self.get_object_info(obj),
1260 perms = r['x-object-sharing'].split(';')
1265 raise ClientError('Incorrect reply format')
1266 (key, val) = perm.strip().split('=')
1270 def set_object_sharing(
1272 read_permission=False, write_permission=False):
1273 """Give read/write permisions to an object.
1275 :param obj: (str) remote object path
1277 :param read_permission: (list - bool) users and user groups that get
1278 read permission for this object - False means all previous read
1279 permissions will be removed
1281 :param write_permission: (list - bool) of users and user groups to get
1282 write permission for this object - False means all previous write
1283 permissions will be removed
1285 :returns: (dict) response headers
1288 perms = dict(read=read_permission or '', write=write_permission or '')
1289 r = self.object_post(obj, update=True, permissions=perms)
1292 def del_object_sharing(self, obj):
1294 :param obj: (str) remote object path
1296 return self.set_object_sharing(obj)
1298 def append_object(self, obj, source_file, upload_cb=None):
1300 :param obj: (str) remote object path
1302 :param source_file: open file descriptor
1304 :param upload_db: progress.bar for uploading
1306 self._assert_container()
1307 meta = self.get_container_info()
1308 blocksize = int(meta['x-container-block-size'])
1309 filesize = fstat(source_file.fileno()).st_size
1310 nblocks = 1 + (filesize - 1) // blocksize
1314 self.progress_bar_gen = upload_cb(nblocks)
1317 self._init_thread_limit()
1319 for i in range(nblocks):
1320 block = source_file.read(min(blocksize, filesize - offset))
1321 offset += len(block)
1323 self._watch_thread_limit(flying.values())
1325 flying[i] = SilentEvent(
1326 method=self.object_post,
1329 content_range='bytes */*',
1330 content_type='application/octet-stream',
1331 content_length=len(block),
1335 for key, thread in flying.items():
1336 if thread.isAlive():
1338 unfinished[key] = thread
1341 if thread.exception:
1342 raise thread.exception
1343 headers[key] = thread.value.headers
1346 except KeyboardInterrupt:
1347 sendlog.info('- - - wait for threads to finish')
1348 for thread in activethreads():
1351 from time import sleep
1352 sleep(2 * len(activethreads()))
1353 return headers.values()
1355 def truncate_object(self, obj, upto_bytes):
1357 :param obj: (str) remote object path
1359 :param upto_bytes: max number of bytes to leave on file
1361 :returns: (dict) response headers
1363 r = self.object_post(
1366 content_range='bytes 0-%s/*' % upto_bytes,
1367 content_type='application/octet-stream',
1368 object_bytes=upto_bytes,
1369 source_object=path4url(self.container, obj))
1372 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1373 """Overwrite a part of an object from local source file
1375 :param obj: (str) remote object path
1377 :param start: (int) position in bytes to start overwriting from
1379 :param end: (int) position in bytes to stop overwriting at
1381 :param source_file: open file descriptor
1383 :param upload_db: progress.bar for uploading
1386 r = self.get_object_info(obj)
1387 rf_size = int(r['content-length'])
1388 if rf_size < int(start):
1390 'Range start exceeds file size',
1392 elif rf_size < int(end):
1394 'Range end exceeds file size',
1396 self._assert_container()
1397 meta = self.get_container_info()
1398 blocksize = int(meta['x-container-block-size'])
1399 filesize = fstat(source_file.fileno()).st_size
1400 datasize = int(end) - int(start) + 1
1401 nblocks = 1 + (datasize - 1) // blocksize
1404 self.progress_bar_gen = upload_cb(nblocks)
1407 for i in range(nblocks):
1408 read_size = min(blocksize, filesize - offset, datasize - offset)
1409 block = source_file.read(read_size)
1410 r = self.object_post(
1413 content_type='application/octet-stream',
1414 content_length=len(block),
1415 content_range='bytes %s-%s/*' % (
1417 start + offset + len(block) - 1),
1419 headers.append(dict(r.headers))
1420 offset += len(block)
1426 self, src_container, src_object, dst_container,
1428 source_version=None,
1429 source_account=None,
1434 :param src_container: (str) source container
1436 :param src_object: (str) source object path
1438 :param dst_container: (str) destination container
1440 :param dst_object: (str) destination object path
1442 :param source_version: (str) source object version
1444 :param source_account: (str) account to copy from
1446 :param public: (bool)
1448 :param content_type: (str)
1450 :param delimiter: (str)
1452 :returns: (dict) response headers
1454 self._assert_account()
1455 self.container = dst_container
1456 src_path = path4url(src_container, src_object)
1457 r = self.object_put(
1458 dst_object or src_object,
1462 source_version=source_version,
1463 source_account=source_account,
1465 content_type=content_type,
1466 delimiter=delimiter)
1470 self, src_container, src_object, dst_container,
1472 source_account=None,
1473 source_version=None,
1478 :param src_container: (str) source container
1480 :param src_object: (str) source object path
1482 :param dst_container: (str) destination container
1484 :param dst_object: (str) destination object path
1486 :param source_account: (str) account to move from
1488 :param source_version: (str) source object version
1490 :param public: (bool)
1492 :param content_type: (str)
1494 :param delimiter: (str)
1496 :returns: (dict) response headers
1498 self._assert_account()
1499 self.container = dst_container
1500 dst_object = dst_object or src_object
1501 src_path = path4url(src_container, src_object)
1502 r = self.object_put(
1507 source_account=source_account,
1508 source_version=source_version,
1510 content_type=content_type,
1511 delimiter=delimiter)
1514 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1515 """Get accounts that share with self.account
1519 :param marker: (str)
1523 self._assert_account()
1525 self.set_param('format', 'json')
1526 self.set_param('limit', limit, iff=limit is not None)
1527 self.set_param('marker', marker, iff=marker is not None)
1530 success = kwargs.pop('success', (200, 204))
1531 r = self.get(path, *args, success=success, **kwargs)
1534 def get_object_versionlist(self, obj):
1536 :param obj: (str) remote object path
1540 self._assert_container()
1541 r = self.object_get(obj, format='json', version='list')
1542 return r.json['versions']