1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
39 from StringIO import StringIO
41 from binascii import hexlify
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in, readall
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, max_value, a_range):
57 :param start: (int) the window bottom
59 :param end: (int) the window top
61 :param max_value: (int) maximum accepted value
63 :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64 where X: x|x-y|-x where x < y and x, y natural numbers
66 :returns: (str) a range string cut-off for the start-end range
67 an empty response means this window is out of range
69 assert start >= 0, '_range_up called w. start(%s) < 0' % start
70 assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
72 assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
75 return '%s-%s' % (start, end)
77 for some_range in a_range.split(','):
78 v0, sep, v1 = some_range.partition('-')
83 if v1 < start or v0 > end or v1 < v0:
85 v0 = v0 if v0 > start else start
86 v1 = v1 if v1 < end else end
87 selected.append('%s-%s' % (v0, v1))
91 v1 = v0 if v0 <= end else end
92 selected.append('%s-%s' % (start, v1))
95 if max_value - v1 > end:
97 v0 = (max_value - v1) if max_value - v1 > start else start
98 selected.append('%s-%s' % (v0, end))
99 return ','.join(selected)
102 class PithosClient(PithosRestClient):
103 """Synnefo Pithos+ API client"""
105 def __init__(self, base_url, token, account=None, container=None):
106 super(PithosClient, self).__init__(base_url, token, account, container)
108 def create_container(
110 container=None, sizelimit=None, versioning=None, metadata=None,
113 :param container: (str) if not given, self.container is used instead
115 :param sizelimit: (int) container total size limit in bytes
117 :param versioning: (str) can be auto or whatever supported by server
119 :param metadata: (dict) Custom user-defined metadata of the form
120 { 'name1': 'value1', 'name2': 'value2', ... }
122 :returns: (dict) response headers
124 cnt_back_up = self.container
126 self.container = container or cnt_back_up
127 r = self.container_put(
128 quota=sizelimit, versioning=versioning, metadata=metadata,
132 self.container = cnt_back_up
134 def purge_container(self, container=None):
135 """Delete an empty container and destroy associated blocks"""
136 cnt_back_up = self.container
138 self.container = container or cnt_back_up
139 r = self.container_delete(until=unicode(time()))
141 self.container = cnt_back_up
144 def upload_object_unchunked(
149 content_encoding=None,
150 content_disposition=None,
155 :param obj: (str) remote object path
157 :param f: open file descriptor
159 :param withHashFile: (bool)
161 :param size: (int) size of data to upload
165 :param content_encoding: (str)
167 :param content_disposition: (str)
169 :param content_type: (str)
171 :param sharing: {'read':[user and/or grp names],
172 'write':[usr and/or grp names]}
174 :param public: (bool)
176 :returns: (dict) created object metadata
178 self._assert_container()
184 data = json.dumps(json.loads(data))
186 raise ClientError('"%s" is not json-formated' % f.name, 1)
188 msg = '"%s" is not a valid hashmap file' % f.name
189 raise ClientError(msg, 1)
192 data = readall(f, size) if size else f.read()
197 content_encoding=content_encoding,
198 content_disposition=content_disposition,
199 content_type=content_type,
205 def create_object_by_manifestation(
208 content_encoding=None,
209 content_disposition=None,
214 :param obj: (str) remote object path
218 :param content_encoding: (str)
220 :param content_disposition: (str)
222 :param content_type: (str)
224 :param sharing: {'read':[user and/or grp names],
225 'write':[usr and/or grp names]}
227 :param public: (bool)
229 :returns: (dict) created object metadata
231 self._assert_container()
236 content_encoding=content_encoding,
237 content_disposition=content_disposition,
238 content_type=content_type,
241 manifest='%s/%s' % (self.container, obj))
244 # upload_* auxiliary methods
245 def _put_block_async(self, data, hash):
246 event = SilentEvent(method=self._put_block, data=data, hash=hash)
250 def _put_block(self, data, hash):
251 r = self.container_post(
253 content_type='application/octet-stream',
254 content_length=len(data),
257 assert r.json[0] == hash, 'Local hash does not match server'
259 def _get_file_block_info(self, fileobj, size=None, cache=None):
261 :param fileobj: (file descriptor) source
263 :param size: (int) size of data to upload from source
265 :param cache: (dict) if provided, cache container info response to
266 avoid redundant calls
268 if isinstance(cache, dict):
270 meta = cache[self.container]
272 meta = self.get_container_info()
273 cache[self.container] = meta
275 meta = self.get_container_info()
276 blocksize = int(meta['x-container-block-size'])
277 blockhash = meta['x-container-block-hash']
278 size = size if size is not None else fstat(fileobj.fileno()).st_size
279 nblocks = 1 + (size - 1) // blocksize
280 return (blocksize, blockhash, size, nblocks)
282 def _create_object_or_get_missing_hashes(
289 if_etag_not_match=None,
290 content_encoding=None,
291 content_disposition=None,
299 content_type=content_type,
301 if_etag_match=if_etag_match,
302 if_etag_not_match=if_etag_not_match,
303 content_encoding=content_encoding,
304 content_disposition=content_disposition,
305 permissions=permissions,
308 return (None if r.status_code == 201 else r.json), r.headers
310 def _calculate_blocks_for_upload(
311 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
315 hash_gen = hash_cb(nblocks)
318 for i in xrange(nblocks):
319 block = readall(fileobj, min(blocksize, size - offset))
323 hash = _pithos_hash(block, blockhash)
325 hmap[hash] = (offset, bytes)
329 msg = ('Failed to calculate uploading blocks: '
330 'read bytes(%s) != requested size (%s)' % (offset, size))
331 assert offset == size, msg
333 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334 """upload missing blocks asynchronously"""
336 self._init_thread_limit()
341 offset, bytes = hmap[hash]
343 data = readall(fileobj, bytes)
344 r = self._put_block_async(data, hash)
346 unfinished = self._watch_thread_limit(flying)
347 for thread in set(flying).difference(unfinished):
349 failures.append(thread)
352 ClientError) and thread.exception.status == 502:
353 self.POOLSIZE = self._thread_limit
354 elif thread.isAlive():
355 flying.append(thread)
363 for thread in flying:
366 failures.append(thread)
373 return [failure.kwargs['hash'] for failure in failures]
383 content_encoding=None,
384 content_disposition=None,
388 container_info_cache=None):
389 """Upload an object using multiple connections (threads)
391 :param obj: (str) remote object path
393 :param f: open file descriptor (rb)
395 :param hash_cb: optional progress.bar object for calculating hashes
397 :param upload_cb: optional progress.bar object for uploading
401 :param if_etag_match: (str) Push that value to if-match header at file
404 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405 it does not exist remotely, otherwise the operation will fail.
406 Involves the case of an object with the same path is created while
407 the object is being uploaded.
409 :param content_encoding: (str)
411 :param content_disposition: (str)
413 :param content_type: (str)
415 :param sharing: {'read':[user and/or grp names],
416 'write':[usr and/or grp names]}
418 :param public: (bool)
420 :param container_info_cache: (dict) if given, avoid redundant calls to
421 server for container info (block size and hash information)
423 self._assert_container()
426 blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427 f, size, container_info_cache)
428 (hashes, hmap, offset) = ([], {}, 0)
430 content_type = 'application/octet-stream'
432 self._calculate_blocks_for_upload(
439 hashmap = dict(bytes=size, hashes=hashes)
440 missing, obj_headers = self._create_object_or_get_missing_hashes(
442 content_type=content_type,
444 if_etag_match=if_etag_match,
445 if_etag_not_match='*' if if_not_exist else None,
446 content_encoding=content_encoding,
447 content_disposition=content_disposition,
455 upload_gen = upload_cb(len(missing))
456 for i in range(len(missing), len(hashmap['hashes']) + 1):
467 sendlog.info('%s blocks missing' % len(missing))
468 num_of_blocks = len(missing)
469 missing = self._upload_missing_blocks(
475 if num_of_blocks == len(missing):
478 num_of_blocks = len(missing)
483 details = ['%s' % thread.exception for thread in missing]
485 details = ['Also, failed to read thread exceptions']
487 '%s blocks failed to upload' % len(missing),
489 except KeyboardInterrupt:
490 sendlog.info('- - - wait for threads to finish')
491 for thread in activethreads():
499 content_type=content_type,
500 content_encoding=content_encoding,
501 if_etag_match=if_etag_match,
502 if_etag_not_match='*' if if_not_exist else None,
510 def upload_from_string(
511 self, obj, input_str,
517 content_encoding=None,
518 content_disposition=None,
522 container_info_cache=None):
523 """Upload an object using multiple connections (threads)
525 :param obj: (str) remote object path
527 :param input_str: (str) upload content
529 :param hash_cb: optional progress.bar object for calculating hashes
531 :param upload_cb: optional progress.bar object for uploading
535 :param if_etag_match: (str) Push that value to if-match header at file
538 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
539 it does not exist remotely, otherwise the operation will fail.
540 Involves the case of an object with the same path is created while
541 the object is being uploaded.
543 :param content_encoding: (str)
545 :param content_disposition: (str)
547 :param content_type: (str)
549 :param sharing: {'read':[user and/or grp names],
550 'write':[usr and/or grp names]}
552 :param public: (bool)
554 :param container_info_cache: (dict) if given, avoid redundant calls to
555 server for container info (block size and hash information)
557 self._assert_container()
559 blocksize, blockhash, size, nblocks = self._get_file_block_info(
560 fileobj=None, size=len(input_str), cache=container_info_cache)
561 (hashes, hmap, offset) = ([], {}, 0)
563 content_type = 'application/octet-stream'
567 for blockid in range(nblocks):
568 start = blockid * blocksize
569 block = input_str[start: (start + blocksize)]
570 hashes.append(_pithos_hash(block, blockhash))
571 hmap[hashes[blockid]] = (start, block)
573 hashmap = dict(bytes=size, hashes=hashes)
574 missing, obj_headers = self._create_object_or_get_missing_hashes(
576 content_type=content_type,
578 if_etag_match=if_etag_match,
579 if_etag_not_match='*' if if_not_exist else None,
580 content_encoding=content_encoding,
581 content_disposition=content_disposition,
586 num_of_missing = len(missing)
589 self.progress_bar_gen = upload_cb(nblocks)
590 for i in range(nblocks + 1 - num_of_missing):
596 while tries and missing:
600 offset, block = hmap[hash]
601 bird = self._put_block_async(block, hash)
603 unfinished = self._watch_thread_limit(flying)
604 for thread in set(flying).difference(unfinished):
606 failures.append(thread.kwargs['hash'])
608 flying.append(thread)
612 for thread in flying:
615 failures.append(thread.kwargs['hash'])
618 if missing and len(missing) == old_failures:
620 old_failures = len(missing)
623 '%s blocks failed to upload' % len(missing),
624 details=['%s' % thread.exception for thread in missing])
625 except KeyboardInterrupt:
626 sendlog.info('- - - wait for threads to finish')
627 for thread in activethreads():
635 content_type=content_type,
636 content_encoding=content_encoding,
637 if_etag_match=if_etag_match,
638 if_etag_not_match='*' if if_not_exist else None,
646 # download_* auxiliary methods
647 def _get_remote_blocks_info(self, obj, **restargs):
648 #retrieve object hashmap
649 myrange = restargs.pop('data_range', None)
650 hashmap = self.get_object_hashmap(obj, **restargs)
651 restargs['data_range'] = myrange
652 blocksize = int(hashmap['block_size'])
653 blockhash = hashmap['block_hash']
654 total_size = hashmap['bytes']
655 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
657 for i, h in enumerate(hashmap['hashes']):
658 # map_dict[h] = i CHAGE
660 map_dict[h].append(i)
663 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
665 def _dump_blocks_sync(
666 self, obj, remote_hashes, blocksize, total_size, dst, crange,
670 for blockid, blockhash in enumerate(remote_hashes):
672 start = blocksize * blockid
673 is_last = start + blocksize > total_size
674 end = (total_size - 1) if is_last else (start + blocksize - 1)
675 data_range = _range_up(start, end, total_size, crange)
679 args['data_range'] = 'bytes=%s' % data_range
680 r = self.object_get(obj, success=(200, 206), **args)
685 def _get_block_async(self, obj, **args):
686 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
690 def _hash_from_file(self, fp, start, size, blockhash):
692 block = readall(fp, size)
693 h = newhashlib(blockhash)
694 h.update(block.strip('\x00'))
695 return hexlify(h.digest())
697 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
698 """write the results of a greenleted rest call to a file
700 :param offset: the offset of the file up to blocksize
701 - e.g. if the range is 10-100, all blocks will be written to
704 for key, g in flying.items():
709 block = g.value.content
710 for block_start in blockids[key]:
711 local_file.seek(block_start + offset)
712 local_file.write(block)
718 def _dump_blocks_async(
719 self, obj, remote_hashes, blocksize, total_size, local_file,
720 blockhash=None, resume=False, filerange=None, **restargs):
721 file_size = fstat(local_file.fileno()).st_size if resume else 0
723 blockid_dict = dict()
726 self._init_thread_limit()
727 for block_hash, blockids in remote_hashes.items():
728 blockids = [blk * blocksize for blk in blockids]
729 unsaved = [blk for blk in blockids if not (
730 blk < file_size and block_hash == self._hash_from_file(
731 local_file, blk, blocksize, blockhash))]
732 self._cb_next(len(blockids) - len(unsaved))
735 self._watch_thread_limit(flying.values())
737 flying, blockid_dict, local_file, offset,
739 end = total_size - 1 if (
740 key + blocksize > total_size) else key + blocksize - 1
744 data_range = _range_up(key, end, total_size, filerange)
749 'async_headers'] = {'Range': 'bytes=%s' % data_range}
750 flying[key] = self._get_block_async(obj, **restargs)
751 blockid_dict[key] = unsaved
753 for thread in flying.values():
755 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
765 if_modified_since=None,
766 if_unmodified_since=None):
767 """Download an object (multiple connections, random blocks)
769 :param obj: (str) remote object path
771 :param dst: open file descriptor (wb+)
773 :param download_cb: optional progress.bar object for downloading
775 :param version: (str) file version
777 :param resume: (bool) if set, preserve already downloaded file parts
779 :param range_str: (str) from, to are file positions (int) in bytes
781 :param if_match: (str)
783 :param if_none_match: (str)
785 :param if_modified_since: (str) formated date
787 :param if_unmodified_since: (str) formated date"""
790 data_range=None if range_str is None else 'bytes=%s' % range_str,
792 if_none_match=if_none_match,
793 if_modified_since=if_modified_since,
794 if_unmodified_since=if_unmodified_since)
801 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
802 assert total_size >= 0
805 self.progress_bar_gen = download_cb(len(hash_list))
809 self._dump_blocks_sync(
818 self._dump_blocks_async(
829 dst.truncate(total_size)
833 def download_to_string(
840 if_modified_since=None,
841 if_unmodified_since=None):
842 """Download an object to a string (multiple connections). This method
843 uses threads for http requests, but stores all content in memory.
845 :param obj: (str) remote object path
847 :param download_cb: optional progress.bar object for downloading
849 :param version: (str) file version
851 :param range_str: (str) from, to are file positions (int) in bytes
853 :param if_match: (str)
855 :param if_none_match: (str)
857 :param if_modified_since: (str) formated date
859 :param if_unmodified_since: (str) formated date
861 :returns: (str) the whole object contents
865 data_range=None if range_str is None else 'bytes=%s' % range_str,
867 if_none_match=if_none_match,
868 if_modified_since=if_modified_since,
869 if_unmodified_since=if_unmodified_since)
876 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
877 assert total_size >= 0
880 self.progress_bar_gen = download_cb(len(hash_list))
883 num_of_blocks = len(remote_hashes)
884 ret = [''] * num_of_blocks
885 self._init_thread_limit()
888 for blockid, blockhash in enumerate(remote_hashes):
889 start = blocksize * blockid
890 is_last = start + blocksize > total_size
891 end = (total_size - 1) if is_last else (start + blocksize - 1)
892 data_range_str = _range_up(start, end, end, range_str)
894 self._watch_thread_limit(flying.values())
895 restargs['data_range'] = 'bytes=%s' % data_range_str
896 flying[blockid] = self._get_block_async(obj, **restargs)
897 for runid, thread in flying.items():
898 if (blockid + 1) == num_of_blocks:
900 elif thread.isAlive():
903 raise thread.exception
904 ret[runid] = thread.value.content
908 except KeyboardInterrupt:
909 sendlog.info('- - - wait for threads to finish')
910 for thread in activethreads():
913 #Command Progress Bar method
914 def _cb_next(self, step=1):
915 if hasattr(self, 'progress_bar_gen'):
917 for i in xrange(step):
918 self.progress_bar_gen.next()
922 def _complete_cb(self):
925 self.progress_bar_gen.next()
929 def get_object_hashmap(
934 if_modified_since=None,
935 if_unmodified_since=None):
937 :param obj: (str) remote object path
939 :param if_match: (str)
941 :param if_none_match: (str)
943 :param if_modified_since: (str) formated date
945 :param if_unmodified_since: (str) formated date
954 if_etag_match=if_match,
955 if_etag_not_match=if_none_match,
956 if_modified_since=if_modified_since,
957 if_unmodified_since=if_unmodified_since)
958 except ClientError as err:
959 if err.status == 304 or err.status == 412:
964 def set_account_group(self, group, usernames):
968 :param usernames: (list)
970 r = self.account_post(update=True, groups={group: usernames})
973 def del_account_group(self, group):
977 self.account_post(update=True, groups={group: []})
979 def get_account_info(self, until=None):
981 :param until: (str) formated date
985 r = self.account_head(until=until)
986 if r.status_code == 401:
987 raise ClientError("No authorization", status=401)
990 def get_account_quota(self):
995 self.get_account_info(),
996 'X-Account-Policy-Quota',
999 #def get_account_versioning(self):
1004 # self.get_account_info(),
1005 # 'X-Account-Policy-Versioning',
1008 def get_account_meta(self, until=None):
1010 :param until: (str) formated date
1014 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1016 def get_account_group(self):
1020 return filter_in(self.get_account_info(), 'X-Account-Group-')
1022 def set_account_meta(self, metapairs):
1024 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1026 assert(type(metapairs) is dict)
1027 r = self.account_post(update=True, metadata=metapairs)
1030 def del_account_meta(self, metakey):
1032 :param metakey: (str) metadatum key
1034 r = self.account_post(update=True, metadata={metakey: ''})
1037 #def set_account_quota(self, quota):
1039 # :param quota: (int)
1041 # self.account_post(update=True, quota=quota)
1043 #def set_account_versioning(self, versioning):
1045 # :param versioning: (str)
1047 # r = self.account_post(update=True, versioning=versioning)
1050 def list_containers(self):
1054 r = self.account_get()
1057 def del_container(self, until=None, delimiter=None):
1059 :param until: (str) formated date
1061 :param delimiter: (str) with / empty container
1063 :raises ClientError: 404 Container does not exist
1065 :raises ClientError: 409 Container is not empty
1067 self._assert_container()
1068 r = self.container_delete(
1070 delimiter=delimiter,
1071 success=(204, 404, 409))
1072 if r.status_code == 404:
1074 'Container "%s" does not exist' % self.container,
1076 elif r.status_code == 409:
1078 'Container "%s" is not empty' % self.container,
1082 def get_container_versioning(self, container=None):
1084 :param container: (str)
1088 cnt_back_up = self.container
1090 self.container = container or cnt_back_up
1092 self.get_container_info(),
1093 'X-Container-Policy-Versioning')
1095 self.container = cnt_back_up
1097 def get_container_limit(self, container=None):
1099 :param container: (str)
1103 cnt_back_up = self.container
1105 self.container = container or cnt_back_up
1107 self.get_container_info(),
1108 'X-Container-Policy-Quota')
1110 self.container = cnt_back_up
1112 def get_container_info(self, container=None, until=None):
1114 :param until: (str) formated date
1118 :raises ClientError: 404 Container not found
1120 bck_cont = self.container
1122 self.container = container or bck_cont
1123 self._assert_container()
1124 r = self.container_head(until=until)
1125 except ClientError as err:
1126 err.details.append('for container %s' % self.container)
1129 self.container = bck_cont
1132 def get_container_meta(self, until=None):
1134 :param until: (str) formated date
1139 self.get_container_info(until=until), 'X-Container-Meta')
1141 def get_container_object_meta(self, until=None):
1143 :param until: (str) formated date
1148 self.get_container_info(until=until), 'X-Container-Object-Meta')
1150 def set_container_meta(self, metapairs):
1152 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1154 assert(type(metapairs) is dict)
1155 r = self.container_post(update=True, metadata=metapairs)
1158 def del_container_meta(self, metakey):
1160 :param metakey: (str) metadatum key
1162 :returns: (dict) response headers
1164 r = self.container_post(update=True, metadata={metakey: ''})
1167 def set_container_limit(self, limit):
1171 r = self.container_post(update=True, quota=limit)
1174 def set_container_versioning(self, versioning):
1176 :param versioning: (str)
1178 r = self.container_post(update=True, versioning=versioning)
1181 def del_object(self, obj, until=None, delimiter=None):
1183 :param obj: (str) remote object path
1185 :param until: (str) formated date
1187 :param delimiter: (str)
1189 self._assert_container()
1190 r = self.object_delete(obj, until=until, delimiter=delimiter)
1193 def set_object_meta(self, obj, metapairs):
1195 :param obj: (str) remote object path
1197 :param metapairs: (dict) {key1:val1, key2:val2, ...}
1199 assert(type(metapairs) is dict)
1200 r = self.object_post(obj, update=True, metadata=metapairs)
1203 def del_object_meta(self, obj, metakey):
1205 :param obj: (str) remote object path
1207 :param metakey: (str) metadatum key
1209 r = self.object_post(obj, update=True, metadata={metakey: ''})
1212 def publish_object(self, obj):
1214 :param obj: (str) remote object path
1216 :returns: (str) access url
1218 self.object_post(obj, update=True, public=True)
1219 info = self.get_object_info(obj)
1220 return info['x-object-public']
1221 pref, sep, rest = self.base_url.partition('//')
1222 base = rest.split('/')[0]
1223 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1225 def unpublish_object(self, obj):
1227 :param obj: (str) remote object path
1229 r = self.object_post(obj, update=True, public=False)
1232 def get_object_info(self, obj, version=None):
1234 :param obj: (str) remote object path
1236 :param version: (str)
1241 r = self.object_head(obj, version=version)
1243 except ClientError as ce:
1244 if ce.status == 404:
1245 raise ClientError('Object %s not found' % obj, status=404)
1248 def get_object_meta(self, obj, version=None):
1250 :param obj: (str) remote object path
1252 :param version: (str)
1257 self.get_object_info(obj, version=version),
1260 def get_object_sharing(self, obj):
1262 :param obj: (str) remote object path
1267 self.get_object_info(obj),
1272 perms = r['x-object-sharing'].split(';')
1277 raise ClientError('Incorrect reply format')
1278 (key, val) = perm.strip().split('=')
1282 def set_object_sharing(
1284 read_permission=False, write_permission=False):
1285 """Give read/write permisions to an object.
1287 :param obj: (str) remote object path
1289 :param read_permission: (list - bool) users and user groups that get
1290 read permission for this object - False means all previous read
1291 permissions will be removed
1293 :param write_permission: (list - bool) of users and user groups to get
1294 write permission for this object - False means all previous write
1295 permissions will be removed
1297 :returns: (dict) response headers
1300 perms = dict(read=read_permission or '', write=write_permission or '')
1301 r = self.object_post(obj, update=True, permissions=perms)
1304 def del_object_sharing(self, obj):
1306 :param obj: (str) remote object path
1308 return self.set_object_sharing(obj)
1310 def append_object(self, obj, source_file, upload_cb=None):
1312 :param obj: (str) remote object path
1314 :param source_file: open file descriptor
1316 :param upload_db: progress.bar for uploading
1318 self._assert_container()
1319 meta = self.get_container_info()
1320 blocksize = int(meta['x-container-block-size'])
1321 filesize = fstat(source_file.fileno()).st_size
1322 nblocks = 1 + (filesize - 1) // blocksize
1326 self.progress_bar_gen = upload_cb(nblocks)
1329 self._init_thread_limit()
1331 for i in range(nblocks):
1332 block = source_file.read(min(blocksize, filesize - offset))
1333 offset += len(block)
1335 self._watch_thread_limit(flying.values())
1337 flying[i] = SilentEvent(
1338 method=self.object_post,
1341 content_range='bytes */*',
1342 content_type='application/octet-stream',
1343 content_length=len(block),
1347 for key, thread in flying.items():
1348 if thread.isAlive():
1350 unfinished[key] = thread
1353 if thread.exception:
1354 raise thread.exception
1355 headers[key] = thread.value.headers
1358 except KeyboardInterrupt:
1359 sendlog.info('- - - wait for threads to finish')
1360 for thread in activethreads():
1363 from time import sleep
1364 sleep(2 * len(activethreads()))
1365 return headers.values()
1367 def truncate_object(self, obj, upto_bytes):
1369 :param obj: (str) remote object path
1371 :param upto_bytes: max number of bytes to leave on file
1373 :returns: (dict) response headers
1375 ctype = self.get_object_info(obj)['content-type']
1376 r = self.object_post(
1379 content_range='bytes 0-%s/*' % upto_bytes,
1381 object_bytes=upto_bytes,
1382 source_object=path4url(self.container, obj))
1385 def overwrite_object(
1386 self, obj, start, end, source_file,
1387 content_type=None, upload_cb=None):
1388 """Overwrite a part of an object from local source file
1390 :param obj: (str) remote object path
1392 :param start: (int) position in bytes to start overwriting from
1394 :param end: (int) position in bytes to stop overwriting at
1396 :param source_file: open file descriptor
1398 :param content_type: (str) default: application/octet-stream
1400 :param upload_db: progress.bar for uploading
1403 r = self.get_object_info(obj)
1404 rf_size = int(r['content-length'])
1405 if rf_size < int(start):
1407 'Range start exceeds file size',
1409 elif rf_size < int(end):
1411 'Range end exceeds file size',
1413 self._assert_container()
1414 meta = self.get_container_info()
1415 blocksize = int(meta['x-container-block-size'])
1416 filesize = fstat(source_file.fileno()).st_size
1417 datasize = int(end) - int(start) + 1
1418 nblocks = 1 + (datasize - 1) // blocksize
1421 self.progress_bar_gen = upload_cb(nblocks)
1424 for i in range(nblocks):
1425 read_size = min(blocksize, filesize - offset, datasize - offset)
1426 block = source_file.read(read_size)
1427 r = self.object_post(
1430 content_type=content_type or 'application/octet-stream',
1431 content_length=len(block),
1432 content_range='bytes %s-%s/*' % (
1434 start + offset + len(block) - 1),
1436 headers.append(dict(r.headers))
1437 offset += len(block)
1443 self, src_container, src_object, dst_container,
1445 source_version=None,
1446 source_account=None,
1451 :param src_container: (str) source container
1453 :param src_object: (str) source object path
1455 :param dst_container: (str) destination container
1457 :param dst_object: (str) destination object path
1459 :param source_version: (str) source object version
1461 :param source_account: (str) account to copy from
1463 :param public: (bool)
1465 :param content_type: (str)
1467 :param delimiter: (str)
1469 :returns: (dict) response headers
1471 self._assert_account()
1472 self.container = dst_container
1473 src_path = path4url(src_container, src_object)
1474 r = self.object_put(
1475 dst_object or src_object,
1479 source_version=source_version,
1480 source_account=source_account,
1482 content_type=content_type,
1483 delimiter=delimiter)
1487 self, src_container, src_object, dst_container,
1489 source_account=None,
1490 source_version=None,
1495 :param src_container: (str) source container
1497 :param src_object: (str) source object path
1499 :param dst_container: (str) destination container
1501 :param dst_object: (str) destination object path
1503 :param source_account: (str) account to move from
1505 :param source_version: (str) source object version
1507 :param public: (bool)
1509 :param content_type: (str)
1511 :param delimiter: (str)
1513 :returns: (dict) response headers
1515 self._assert_account()
1516 self.container = dst_container
1517 dst_object = dst_object or src_object
1518 src_path = path4url(src_container, src_object)
1519 r = self.object_put(
1524 source_account=source_account,
1525 source_version=source_version,
1527 content_type=content_type,
1528 delimiter=delimiter)
1531 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1532 """Get accounts that share with self.account
1536 :param marker: (str)
1540 self._assert_account()
1542 self.set_param('format', 'json')
1543 self.set_param('limit', limit, iff=limit is not None)
1544 self.set_param('marker', marker, iff=marker is not None)
1547 success = kwargs.pop('success', (200, 204))
1548 r = self.get(path, *args, success=success, **kwargs)
1551 def get_object_versionlist(self, obj):
1553 :param obj: (str) remote object path
1557 self._assert_container()
1558 r = self.object_get(obj, format='json', version='list')
1559 return r.json['versions']