1 # Copyright 2011-2014 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, max_value, a_range):
57 :param start: (int) the window bottom
59 :param end: (int) the window top
61 :param max_value: (int) maximum accepted value
63 :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64 where X: x|x-y|-x where x < y and x, y natural numbers
66 :returns: (str) a range string cut-off for the start-end range
67 an empty response means this window is out of range
69 assert start >= 0, '_range_up called w. start(%s) < 0' % start
70 assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
72 assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
75 return '%s-%s' % (start, end)
77 for some_range in a_range.split(','):
78 v0, sep, v1 = some_range.partition('-')
83 if v1 < start or v0 > end or v1 < v0:
85 v0 = v0 if v0 > start else start
86 v1 = v1 if v1 < end else end
87 selected.append('%s-%s' % (v0, v1))
91 v1 = v0 if v0 <= end else end
92 selected.append('%s-%s' % (start, v1))
95 if max_value - v1 > end:
97 v0 = (max_value - v1) if max_value - v1 > start else start
98 selected.append('%s-%s' % (v0, end))
99 return ','.join(selected)
102 class PithosClient(PithosRestClient):
103 """Synnefo Pithos+ API client"""
105 def __init__(self, base_url, token, account=None, container=None):
106 super(PithosClient, self).__init__(base_url, token, account, container)
108 def create_container(
110 container=None, sizelimit=None, versioning=None, metadata=None,
113 :param container: (str) if not given, self.container is used instead
115 :param sizelimit: (int) container total size limit in bytes
117 :param versioning: (str) can be auto or whatever supported by server
119 :param metadata: (dict) Custom user-defined metadata of the form
120 { 'name1': 'value1', 'name2': 'value2', ... }
122 :returns: (dict) response headers
124 cnt_back_up = self.container
126 self.container = container or cnt_back_up
127 r = self.container_put(
128 quota=sizelimit, versioning=versioning, metadata=metadata,
132 self.container = cnt_back_up
134 def purge_container(self, container=None):
135 """Delete an empty container and destroy associated blocks"""
136 cnt_back_up = self.container
138 self.container = container or cnt_back_up
139 r = self.container_delete(until=unicode(time()))
141 self.container = cnt_back_up
144 def upload_object_unchunked(
149 content_encoding=None,
150 content_disposition=None,
155 :param obj: (str) remote object path
157 :param f: open file descriptor
159 :param withHashFile: (bool)
161 :param size: (int) size of data to upload
165 :param content_encoding: (str)
167 :param content_disposition: (str)
169 :param content_type: (str)
171 :param sharing: {'read':[user and/or grp names],
172 'write':[usr and/or grp names]}
174 :param public: (bool)
176 :returns: (dict) created object metadata
178 self._assert_container()
184 data = json.dumps(json.loads(data))
186 raise ClientError('"%s" is not json-formated' % f.name, 1)
188 msg = '"%s" is not a valid hashmap file' % f.name
189 raise ClientError(msg, 1)
192 data = readall(f, size) if size else f.read()
197 content_encoding=content_encoding,
198 content_disposition=content_disposition,
199 content_type=content_type,
205 def create_object_by_manifestation(
208 content_encoding=None,
209 content_disposition=None,
214 :param obj: (str) remote object path
218 :param content_encoding: (str)
220 :param content_disposition: (str)
222 :param content_type: (str)
224 :param sharing: {'read':[user and/or grp names],
225 'write':[usr and/or grp names]}
227 :param public: (bool)
229 :returns: (dict) created object metadata
231 self._assert_container()
236 content_encoding=content_encoding,
237 content_disposition=content_disposition,
238 content_type=content_type,
241 manifest='%s/%s' % (self.container, obj))
244 # upload_* auxiliary methods
245 def _put_block_async(self, data, hash):
246 event = SilentEvent(method=self._put_block, data=data, hash=hash)
250 def _put_block(self, data, hash):
251 r = self.container_post(
253 content_type='application/octet-stream',
254 content_length=len(data),
257 assert r.json[0] == hash, 'Local hash does not match server'
259 def _get_file_block_info(self, fileobj, size=None, cache=None):
261 :param fileobj: (file descriptor) source
263 :param size: (int) size of data to upload from source
265 :param cache: (dict) if provided, cache container info response to
266 avoid redundant calls
268 if isinstance(cache, dict):
270 meta = cache[self.container]
272 meta = self.get_container_info()
273 cache[self.container] = meta
275 meta = self.get_container_info()
276 blocksize = int(meta['x-container-block-size'])
277 blockhash = meta['x-container-block-hash']
278 size = size if size is not None else fstat(fileobj.fileno()).st_size
279 nblocks = 1 + (size - 1) // blocksize
280 return (blocksize, blockhash, size, nblocks)
282 def _create_object_or_get_missing_hashes(
289 if_etag_not_match=None,
290 content_encoding=None,
291 content_disposition=None,
299 content_type=content_type,
301 if_etag_match=if_etag_match,
302 if_etag_not_match=if_etag_not_match,
303 content_encoding=content_encoding,
304 content_disposition=content_disposition,
305 permissions=permissions,
308 return (None if r.status_code == 201 else r.json), r.headers
310 def _calculate_blocks_for_upload(
311 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
315 hash_gen = hash_cb(nblocks)
318 for i in xrange(nblocks):
319 block = readall(fileobj, min(blocksize, size - offset))
323 hash = _pithos_hash(block, blockhash)
325 hmap[hash] = (offset, bytes)
329 msg = ('Failed to calculate uploading blocks: '
330 'read bytes(%s) != requested size (%s)' % (offset, size))
331 assert offset == size, msg
333 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334 """upload missing blocks asynchronously"""
336 self._init_thread_limit()
341 offset, bytes = hmap[hash]
343 data = readall(fileobj, bytes)
344 r = self._put_block_async(data, hash)
346 unfinished = self._watch_thread_limit(flying)
347 for thread in set(flying).difference(unfinished):
349 failures.append(thread)
352 ClientError) and thread.exception.status == 502:
353 self.POOLSIZE = self._thread_limit
354 elif thread.isAlive():
355 flying.append(thread)
363 for thread in flying:
366 failures.append(thread)
373 return [failure.kwargs['hash'] for failure in failures]
383 content_encoding=None,
384 content_disposition=None,
388 container_info_cache=None):
389 """Upload an object using multiple connections (threads)
391 :param obj: (str) remote object path
393 :param f: open file descriptor (rb)
395 :param hash_cb: optional progress.bar object for calculating hashes
397 :param upload_cb: optional progress.bar object for uploading
401 :param if_etag_match: (str) Push that value to if-match header at file
404 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405 it does not exist remotely, otherwise the operation will fail.
406 Involves the case of an object with the same path is created while
407 the object is being uploaded.
409 :param content_encoding: (str)
411 :param content_disposition: (str)
413 :param content_type: (str)
415 :param sharing: {'read':[user and/or grp names],
416 'write':[usr and/or grp names]}
418 :param public: (bool)
420 :param container_info_cache: (dict) if given, avoid redundant calls to
421 server for container info (block size and hash information)
423 self._assert_container()
426 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427 f, size, container_info_cache)
428 (hashes, hmap, offset) = ([], {}, 0)
429 content_type = content_type or 'application/octet-stream'
431 self._calculate_blocks_for_upload(
438 hashmap = dict(bytes=size, hashes=hashes)
439 missing, obj_headers = self._create_object_or_get_missing_hashes(
441 content_type=content_type,
443 if_etag_match=if_etag_match,
444 if_etag_not_match='*' if if_not_exist else None,
445 content_encoding=content_encoding,
446 content_disposition=content_disposition,
454 upload_gen = upload_cb(len(missing))
455 for i in range(len(missing), len(hashmap['hashes']) + 1):
466 sendlog.info('%s blocks missing' % len(missing))
467 num_of_blocks = len(missing)
468 missing = self._upload_missing_blocks(
469 missing, hmap, f, upload_gen)
471 if num_of_blocks == len(missing):
474 num_of_blocks = len(missing)
479 details = ['%s' % thread.exception for thread in missing]
481 details = ['Also, failed to read thread exceptions']
483 '%s blocks failed to upload' % len(missing),
485 except KeyboardInterrupt:
486 sendlog.info('- - - wait for threads to finish')
487 for thread in activethreads():
495 content_type=content_type,
496 content_encoding=content_encoding,
497 if_etag_match=if_etag_match,
498 if_etag_not_match='*' if if_not_exist else None,
506 def upload_from_string(
507 self, obj, input_str,
513 content_encoding=None,
514 content_disposition=None,
518 container_info_cache=None):
519 """Upload an object using multiple connections (threads)
521 :param obj: (str) remote object path
523 :param input_str: (str) upload content
525 :param hash_cb: optional progress.bar object for calculating hashes
527 :param upload_cb: optional progress.bar object for uploading
531 :param if_etag_match: (str) Push that value to if-match header at file
534 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
535 it does not exist remotely, otherwise the operation will fail.
536 Involves the case of an object with the same path is created while
537 the object is being uploaded.
539 :param content_encoding: (str)
541 :param content_disposition: (str)
543 :param content_type: (str)
545 :param sharing: {'read':[user and/or grp names],
546 'write':[usr and/or grp names]}
548 :param public: (bool)
550 :param container_info_cache: (dict) if given, avoid redundant calls to
551 server for container info (block size and hash information)
553 self._assert_container()
555 blocksize, blockhash, size, nblocks = self._get_file_block_info(
556 fileobj=None, size=len(input_str), cache=container_info_cache)
557 (hashes, hmap, offset) = ([], {}, 0)
559 content_type = 'application/octet-stream'
563 for blockid in range(nblocks):
564 start = blockid * blocksize
565 block = input_str[start: (start + blocksize)]
566 hashes.append(_pithos_hash(block, blockhash))
567 hmap[hashes[blockid]] = (start, block)
569 hashmap = dict(bytes=size, hashes=hashes)
570 missing, obj_headers = self._create_object_or_get_missing_hashes(
572 content_type=content_type,
574 if_etag_match=if_etag_match,
575 if_etag_not_match='*' if if_not_exist else None,
576 content_encoding=content_encoding,
577 content_disposition=content_disposition,
582 num_of_missing = len(missing)
585 self.progress_bar_gen = upload_cb(nblocks)
586 for i in range(nblocks + 1 - num_of_missing):
592 while tries and missing:
596 offset, block = hmap[hash]
597 bird = self._put_block_async(block, hash)
599 unfinished = self._watch_thread_limit(flying)
600 for thread in set(flying).difference(unfinished):
602 failures.append(thread.kwargs['hash'])
604 flying.append(thread)
608 for thread in flying:
611 failures.append(thread.kwargs['hash'])
614 if missing and len(missing) == old_failures:
616 old_failures = len(missing)
618 raise ClientError('%s blocks failed to upload' % len(missing))
619 except KeyboardInterrupt:
620 sendlog.info('- - - wait for threads to finish')
621 for thread in activethreads():
630 content_type=content_type,
631 content_encoding=content_encoding,
632 if_etag_match=if_etag_match,
633 if_etag_not_match='*' if if_not_exist else None,
641 # download_* auxiliary methods
642 def _get_remote_blocks_info(self, obj, **restargs):
643 #retrieve object hashmap
644 myrange = restargs.pop('data_range', None)
645 hashmap = self.get_object_hashmap(obj, **restargs)
646 restargs['data_range'] = myrange
647 blocksize = int(hashmap['block_size'])
648 blockhash = hashmap['block_hash']
649 total_size = hashmap['bytes']
650 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
652 for i, h in enumerate(hashmap['hashes']):
653 # map_dict[h] = i CHAGE
655 map_dict[h].append(i)
658 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
660 def _dump_blocks_sync(
661 self, obj, remote_hashes, blocksize, total_size, dst, crange,
665 for blockid, blockhash in enumerate(remote_hashes):
667 start = blocksize * blockid
668 is_last = start + blocksize > total_size
669 end = (total_size - 1) if is_last else (start + blocksize - 1)
670 data_range = _range_up(start, end, total_size, crange)
674 args['data_range'] = 'bytes=%s' % data_range
675 r = self.object_get(obj, success=(200, 206), **args)
680 def _get_block_async(self, obj, **args):
681 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
685 def _hash_from_file(self, fp, start, size, blockhash):
687 block = readall(fp, size)
688 h = newhashlib(blockhash)
689 h.update(block.strip('\x00'))
690 return hexlify(h.digest())
692 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
693 """write the results of a greenleted rest call to a file
695 :param offset: the offset of the file up to blocksize
696 - e.g. if the range is 10-100, all blocks will be written to
699 for key, g in flying.items():
704 block = g.value.content
705 for block_start in blockids[key]:
706 local_file.seek(block_start + offset)
707 local_file.write(block)
713 def _dump_blocks_async(
714 self, obj, remote_hashes, blocksize, total_size, local_file,
715 blockhash=None, resume=False, filerange=None, **restargs):
716 file_size = fstat(local_file.fileno()).st_size if resume else 0
718 blockid_dict = dict()
721 self._init_thread_limit()
722 for block_hash, blockids in remote_hashes.items():
723 blockids = [blk * blocksize for blk in blockids]
724 unsaved = [blk for blk in blockids if not (
725 blk < file_size and block_hash == self._hash_from_file(
726 local_file, blk, blocksize, blockhash))]
727 self._cb_next(len(blockids) - len(unsaved))
730 self._watch_thread_limit(flying.values())
732 flying, blockid_dict, local_file, offset,
734 end = total_size - 1 if (
735 key + blocksize > total_size) else key + blocksize - 1
739 data_range = _range_up(key, end, total_size, filerange)
744 'async_headers'] = {'Range': 'bytes=%s' % data_range}
745 flying[key] = self._get_block_async(obj, **restargs)
746 blockid_dict[key] = unsaved
748 for thread in flying.values():
750 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
760 if_modified_since=None,
761 if_unmodified_since=None):
762 """Download an object (multiple connections, random blocks)
764 :param obj: (str) remote object path
766 :param dst: open file descriptor (wb+)
768 :param download_cb: optional progress.bar object for downloading
770 :param version: (str) file version
772 :param resume: (bool) if set, preserve already downloaded file parts
774 :param range_str: (str) from, to are file positions (int) in bytes
776 :param if_match: (str)
778 :param if_none_match: (str)
780 :param if_modified_since: (str) formated date
782 :param if_unmodified_since: (str) formated date"""
785 data_range=None if range_str is None else 'bytes=%s' % range_str,
787 if_none_match=if_none_match,
788 if_modified_since=if_modified_since,
789 if_unmodified_since=if_unmodified_since)
796 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
797 assert total_size >= 0
800 self.progress_bar_gen = download_cb(len(hash_list))
804 self._dump_blocks_sync(
813 self._dump_blocks_async(
824 dst.truncate(total_size)
828 def download_to_string(
835 if_modified_since=None,
836 if_unmodified_since=None):
837 """Download an object to a string (multiple connections). This method
838 uses threads for http requests, but stores all content in memory.
840 :param obj: (str) remote object path
842 :param download_cb: optional progress.bar object for downloading
844 :param version: (str) file version
846 :param range_str: (str) from, to are file positions (int) in bytes
848 :param if_match: (str)
850 :param if_none_match: (str)
852 :param if_modified_since: (str) formated date
854 :param if_unmodified_since: (str) formated date
856 :returns: (str) the whole object contents
860 data_range=None if range_str is None else 'bytes=%s' % range_str,
862 if_none_match=if_none_match,
863 if_modified_since=if_modified_since,
864 if_unmodified_since=if_unmodified_since)
871 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
872 assert total_size >= 0
875 self.progress_bar_gen = download_cb(len(hash_list))
878 num_of_blocks = len(remote_hashes)
879 ret = [''] * num_of_blocks
880 self._init_thread_limit()
883 for blockid, blockhash in enumerate(remote_hashes):
884 start = blocksize * blockid
885 is_last = start + blocksize > total_size
886 end = (total_size - 1) if is_last else (start + blocksize - 1)
887 data_range_str = _range_up(start, end, end, range_str)
889 self._watch_thread_limit(flying.values())
890 restargs['data_range'] = 'bytes=%s' % data_range_str
891 flying[blockid] = self._get_block_async(obj, **restargs)
892 for runid, thread in flying.items():
893 if (blockid + 1) == num_of_blocks:
895 elif thread.isAlive():
898 raise thread.exception
899 ret[runid] = thread.value.content
903 except KeyboardInterrupt:
904 sendlog.info('- - - wait for threads to finish')
905 for thread in activethreads():
908 #Command Progress Bar method
909 def _cb_next(self, step=1):
910 if hasattr(self, 'progress_bar_gen'):
912 for i in xrange(step):
913 self.progress_bar_gen.next()
917 def _complete_cb(self):
920 self.progress_bar_gen.next()
924 def get_object_hashmap(
929 if_modified_since=None,
930 if_unmodified_since=None):
932 :param obj: (str) remote object path
934 :param if_match: (str)
936 :param if_none_match: (str)
938 :param if_modified_since: (str) formated date
940 :param if_unmodified_since: (str) formated date
949 if_etag_match=if_match,
950 if_etag_not_match=if_none_match,
951 if_modified_since=if_modified_since,
952 if_unmodified_since=if_unmodified_since)
953 except ClientError as err:
954 if err.status == 304 or err.status == 412:
959 def set_account_group(self, group, usernames):
963 :param usernames: (list)
965 r = self.account_post(update=True, groups={group: usernames})
968 def del_account_group(self, group):
972 self.account_post(update=True, groups={group: []})
974 def get_account_info(self, until=None):
976 :param until: (str) formated date
980 r = self.account_head(until=until)
981 if r.status_code == 401:
982 raise ClientError("No authorization", status=401)
985 def get_account_quota(self):
990 self.get_account_info(),
991 'X-Account-Policy-Quota',
994 #def get_account_versioning(self):
999 # self.get_account_info(),
1000 # 'X-Account-Policy-Versioning',
1003 def get_account_meta(self, until=None):
1005 :param until: (str) formated date
1009 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1011 def get_account_group(self):
1015 return filter_in(self.get_account_info(), 'X-Account-Group-')
1017 def set_account_meta(self, metapairs):
1019 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1021 assert(type(metapairs) is dict)
1022 r = self.account_post(update=True, metadata=metapairs)
1025 def del_account_meta(self, metakey):
1027 :param metakey: (str) metadatum key
1029 r = self.account_post(update=True, metadata={metakey: ''})
1032 #def set_account_quota(self, quota):
1034 # :param quota: (int)
1036 # self.account_post(update=True, quota=quota)
1038 #def set_account_versioning(self, versioning):
1040 # :param versioning: (str)
1042 # r = self.account_post(update=True, versioning=versioning)
1045 def list_containers(self):
1049 r = self.account_get()
1052 def del_container(self, until=None, delimiter=None):
1054 :param until: (str) formated date
1056 :param delimiter: (str) with / empty container
1058 :raises ClientError: 404 Container does not exist
1060 :raises ClientError: 409 Container is not empty
1062 self._assert_container()
1063 r = self.container_delete(
1065 delimiter=delimiter,
1066 success=(204, 404, 409))
1067 if r.status_code == 404:
1069 'Container "%s" does not exist' % self.container,
1071 elif r.status_code == 409:
1073 'Container "%s" is not empty' % self.container,
1077 def get_container_versioning(self, container=None):
1079 :param container: (str)
1083 cnt_back_up = self.container
1085 self.container = container or cnt_back_up
1087 self.get_container_info(),
1088 'X-Container-Policy-Versioning')
1090 self.container = cnt_back_up
1092 def get_container_limit(self, container=None):
1094 :param container: (str)
1098 cnt_back_up = self.container
1100 self.container = container or cnt_back_up
1102 self.get_container_info(),
1103 'X-Container-Policy-Quota')
1105 self.container = cnt_back_up
1107 def get_container_info(self, container=None, until=None):
1109 :param until: (str) formated date
1113 :raises ClientError: 404 Container not found
1115 bck_cont = self.container
1117 self.container = container or bck_cont
1118 self._assert_container()
1119 r = self.container_head(until=until)
1120 except ClientError as err:
1121 err.details.append('for container %s' % self.container)
1124 self.container = bck_cont
1127 def get_container_meta(self, until=None):
1129 :param until: (str) formated date
1134 self.get_container_info(until=until), 'X-Container-Meta')
1136 def get_container_object_meta(self, until=None):
1138 :param until: (str) formated date
1143 self.get_container_info(until=until), 'X-Container-Object-Meta')
1145 def set_container_meta(self, metapairs):
1147 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1149 assert(type(metapairs) is dict)
1150 r = self.container_post(update=True, metadata=metapairs)
1153 def del_container_meta(self, metakey):
1155 :param metakey: (str) metadatum key
1157 :returns: (dict) response headers
1159 r = self.container_post(update=True, metadata={metakey: ''})
1162 def set_container_limit(self, limit):
1166 r = self.container_post(update=True, quota=limit)
1169 def set_container_versioning(self, versioning):
1171 :param versioning: (str)
1173 r = self.container_post(update=True, versioning=versioning)
1176 def del_object(self, obj, until=None, delimiter=None):
1178 :param obj: (str) remote object path
1180 :param until: (str) formated date
1182 :param delimiter: (str)
1184 self._assert_container()
1185 r = self.object_delete(obj, until=until, delimiter=delimiter)
1188 def set_object_meta(self, obj, metapairs):
1190 :param obj: (str) remote object path
1192 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1194 assert(type(metapairs) is dict)
1195 r = self.object_post(obj, update=True, metadata=metapairs)
1198 def del_object_meta(self, obj, metakey):
1200 :param obj: (str) remote object path
1202 :param metakey: (str) metadatum key
1204 r = self.object_post(obj, update=True, metadata={metakey: ''})
1207 def publish_object(self, obj):
1209 :param obj: (str) remote object path
1211 :returns: (str) access url
1213 self.object_post(obj, update=True, public=True)
1214 info = self.get_object_info(obj)
1215 return info['x-object-public']
1216 pref, sep, rest = self.base_url.partition('//')
1217 base = rest.split('/')[0]
1218 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1220 def unpublish_object(self, obj):
1222 :param obj: (str) remote object path
1224 r = self.object_post(obj, update=True, public=False)
1227 def get_object_info(self, obj, version=None):
1229 :param obj: (str) remote object path
1231 :param version: (str)
1236 r = self.object_head(obj, version=version)
1238 except ClientError as ce:
1239 if ce.status == 404:
1240 raise ClientError('Object %s not found' % obj, status=404)
1243 def get_object_meta(self, obj, version=None):
1245 :param obj: (str) remote object path
1247 :param version: (str)
1252 self.get_object_info(obj, version=version),
1255 def get_object_sharing(self, obj):
1257 :param obj: (str) remote object path
1262 self.get_object_info(obj),
1267 perms = r['x-object-sharing'].split(';')
1272 raise ClientError('Incorrect reply format')
1273 (key, val) = perm.strip().split('=')
1277 def set_object_sharing(
1279 read_permission=False, write_permission=False):
1280 """Give read/write permisions to an object.
1282 :param obj: (str) remote object path
1284 :param read_permission: (list - bool) users and user groups that get
1285 read permission for this object - False means all previous read
1286 permissions will be removed
1288 :param write_permission: (list - bool) of users and user groups to get
1289 write permission for this object - False means all previous write
1290 permissions will be removed
1292 :returns: (dict) response headers
1295 perms = dict(read=read_permission or '', write=write_permission or '')
1296 r = self.object_post(obj, update=True, permissions=perms)
1299 def del_object_sharing(self, obj):
1301 :param obj: (str) remote object path
1303 return self.set_object_sharing(obj)
1305 def append_object(self, obj, source_file, upload_cb=None):
1307 :param obj: (str) remote object path
1309 :param source_file: open file descriptor
1311 :param upload_db: progress.bar for uploading
1313 self._assert_container()
1314 meta = self.get_container_info()
1315 blocksize = int(meta['x-container-block-size'])
1316 filesize = fstat(source_file.fileno()).st_size
1317 nblocks = 1 + (filesize - 1) // blocksize
1321 self.progress_bar_gen = upload_cb(nblocks)
1324 self._init_thread_limit()
1326 for i in range(nblocks):
1327 block = source_file.read(min(blocksize, filesize - offset))
1328 offset += len(block)
1330 self._watch_thread_limit(flying.values())
1332 flying[i] = SilentEvent(
1333 method=self.object_post,
1336 content_range='bytes */*',
1337 content_type='application/octet-stream',
1338 content_length=len(block),
1342 for key, thread in flying.items():
1343 if thread.isAlive():
1345 unfinished[key] = thread
1348 if thread.exception:
1349 raise thread.exception
1350 headers[key] = thread.value.headers
1353 except KeyboardInterrupt:
1354 sendlog.info('- - - wait for threads to finish')
1355 for thread in activethreads():
1358 from time import sleep
1359 sleep(2 * len(activethreads()))
1361 return headers.values()
1363 def truncate_object(self, obj, upto_bytes):
1365 :param obj: (str) remote object path
1367 :param upto_bytes: max number of bytes to leave on file
1369 :returns: (dict) response headers
1371 ctype = self.get_object_info(obj)['content-type']
1372 r = self.object_post(
1375 content_range='bytes 0-%s/*' % upto_bytes,
1377 object_bytes=upto_bytes,
1378 source_object=path4url(self.container, obj))
1381 def overwrite_object(
1382 self, obj, start, end, source_file,
1383 source_version=None, upload_cb=None):
1384 """Overwrite a part of an object from local source file
1385 ATTENTION: content_type must always be application/octet-stream
1387 :param obj: (str) remote object path
1389 :param start: (int) position in bytes to start overwriting from
1391 :param end: (int) position in bytes to stop overwriting at
1393 :param source_file: open file descriptor
1395 :param upload_db: progress.bar for uploading
1398 self._assert_container()
1399 r = self.get_object_info(obj, version=source_version)
1400 rf_size = int(r['content-length'])
1401 start, end = int(start), int(end)
1402 assert rf_size >= start, 'Range start %s exceeds file size %s' % (
1404 meta = self.get_container_info()
1405 blocksize = int(meta['x-container-block-size'])
1406 filesize = fstat(source_file.fileno()).st_size
1407 datasize = end - start + 1
1408 nblocks = 1 + (datasize - 1) // blocksize
1411 self.progress_bar_gen = upload_cb(nblocks)
1414 for i in range(nblocks):
1415 read_size = min(blocksize, filesize - offset, datasize - offset)
1416 block = source_file.read(read_size)
1417 r = self.object_post(
1420 content_type='application/octet-stream',
1421 content_length=len(block),
1422 content_range='bytes %s-%s/*' % (
1424 start + offset + len(block) - 1),
1425 source_version=source_version,
1427 headers.append(dict(r.headers))
1428 offset += len(block)
1434 self, src_container, src_object, dst_container,
1436 source_version=None,
1437 source_account=None,
1442 :param src_container: (str) source container
1444 :param src_object: (str) source object path
1446 :param dst_container: (str) destination container
1448 :param dst_object: (str) destination object path
1450 :param source_version: (str) source object version
1452 :param source_account: (str) account to copy from
1454 :param public: (bool)
1456 :param content_type: (str)
1458 :param delimiter: (str)
1460 :returns: (dict) response headers
1462 self._assert_account()
1463 self.container = dst_container
1464 src_path = path4url(src_container, src_object)
1465 r = self.object_put(
1466 dst_object or src_object,
1470 source_version=source_version,
1471 source_account=source_account,
1473 content_type=content_type,
1474 delimiter=delimiter)
1478 self, src_container, src_object, dst_container,
1480 source_account=None,
1481 source_version=None,
1486 :param src_container: (str) source container
1488 :param src_object: (str) source object path
1490 :param dst_container: (str) destination container
1492 :param dst_object: (str) destination object path
1494 :param source_account: (str) account to move from
1496 :param source_version: (str) source object version
1498 :param public: (bool)
1500 :param content_type: (str)
1502 :param delimiter: (str)
1504 :returns: (dict) response headers
1506 self._assert_account()
1507 self.container = dst_container
1508 dst_object = dst_object or src_object
1509 src_path = path4url(src_container, src_object)
1510 r = self.object_put(
1515 source_account=source_account,
1516 source_version=source_version,
1518 content_type=content_type,
1519 delimiter=delimiter)
1522 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1523 """Get accounts that share with self.account
1527 :param marker: (str)
1531 self._assert_account()
1533 self.set_param('format', 'json')
1534 self.set_param('limit', limit, iff=limit is not None)
1535 self.set_param('marker', marker, iff=marker is not None)
1538 success = kwargs.pop('success', (200, 204))
1539 r = self.get(path, *args, success=success, **kwargs)
1542 def get_object_versionlist(self, obj):
1544 :param obj: (str) remote object path
1548 self._assert_container()
1549 r = self.object_get(obj, format='json', version='list')
1550 return r.json['versions']