1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos_rest_api import PithosRestAPI
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestAPI):
69 """GRNet Pithos API client"""
71 _thread_exceptions = []
73 def __init__(self, base_url, token, account=None, container=None):
74 super(PithosClient, self).__init__(base_url, token, account, container)
76 def purge_container(self):
77 """Delete an empty container and destroy associated blocks
79 r = self.container_delete(until=unicode(time()))
82 def upload_object_unchunked(self, obj, f,
86 content_encoding=None,
87 content_disposition=None,
92 :param obj: (str) remote object path
94 :param f: open file descriptor
96 :param withHashFile: (bool)
98 :param size: (int) size of data to upload
102 :param content_encoding: (str)
104 :param content_disposition: (str)
106 :param content_type: (str)
108 :param sharing: {'read':[user and/or grp names],
109 'write':[usr and/or grp names]}
111 :param public: (bool)
113 self._assert_container()
119 data = json.dumps(json.loads(data))
121 raise ClientError(message='"%s" is not json-formated' % f.name,
124 raise ClientError(message='"%s" is not a valid hashmap file'\
127 data = f.read(size) if size is not None else f.read()
128 r = self.object_put(obj,
131 content_encoding=content_encoding,
132 content_disposition=content_disposition,
133 content_type=content_type,
139 def create_object_by_manifestation(self, obj,
141 content_encoding=None,
142 content_disposition=None,
147 :param obj: (str) remote object path
151 :param content_encoding: (str)
153 :param content_disposition: (str)
155 :param content_type: (str)
157 :param sharing: {'read':[user and/or grp names],
158 'write':[usr and/or grp names]}
160 :param public: (bool)
162 self._assert_container()
163 r = self.object_put(obj,
166 content_encoding=content_encoding,
167 content_disposition=content_disposition,
168 content_type=content_type,
171 manifest='%s/%s' % (self.container, obj))
174 # upload_* auxiliary methods
175 def _put_block_async(self, data, hash, upload_gen=None):
176 event = SilentEvent(method=self._put_block, data=data, hash=hash)
180 def _put_block(self, data, hash):
181 from random import randint
182 if not randint(0, 7):
183 raise ClientError('BAD GATEWAY STUFF', 503)
184 r = self.container_post(update=True,
185 content_type='application/octet-stream',
186 content_length=len(data),
189 assert r.json[0] == hash, 'Local hash does not match server'
191 def _get_file_block_info(self, fileobj, size=None):
192 meta = self.get_container_info()
193 blocksize = int(meta['x-container-block-size'])
194 blockhash = meta['x-container-block-hash']
195 size = size if size is not None else fstat(fileobj.fileno()).st_size
196 nblocks = 1 + (size - 1) // blocksize
197 return (blocksize, blockhash, size, nblocks)
199 def _get_missing_hashes(self, obj, json,
205 content_encoding=None,
206 content_disposition=None,
210 r = self.object_put(obj,
213 content_type=content_type,
216 content_encoding=content_encoding,
217 content_disposition=content_disposition,
218 permissions=permissions,
221 if r.status_code == 201:
226 def _caclulate_uploaded_blocks(self,
237 hash_gen = hash_cb(nblocks)
240 for i in range(nblocks):
241 block = fileobj.read(min(blocksize, size - offset))
243 hash = _pithos_hash(block, blockhash)
245 hmap[hash] = (offset, bytes)
250 assert offset == size, \
251 "Failed to calculate uploaded blocks: " \
252 "Offset and object size do not match"
254 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
255 """upload missing blocks asynchronously"""
257 self._init_thread_limit()
262 offset, bytes = hmap[hash]
264 data = fileobj.read(bytes)
265 r = self._put_block_async(data, hash, upload_gen)
267 unfinished = self._watch_thread_limit(flying)
268 for thread in set(flying).difference(unfinished):
270 failures.append(thread)
271 if isinstance(thread.exception, ClientError)\
272 and thread.exception.status == 502:
273 self.POOLSIZE = self._thread_limit
274 elif thread.isAlive():
275 flying.append(thread)
283 for thread in flying:
286 failures.append(thread)
293 return [failure.kwargs['hash'] for failure in failures]
295 def upload_object(self, obj, f,
300 content_encoding=None,
301 content_disposition=None,
305 """Upload an object using multiple connections (threads)
307 :param obj: (str) remote object path
309 :param f: open file descriptor (rb)
311 :param hash_cb: optional progress.bar object for calculating hashes
313 :param upload_cb: optional progress.bar object for uploading
317 :param content_encoding: (str)
319 :param content_disposition: (str)
321 :param content_type: (str)
323 :param sharing: {'read':[user and/or grp names],
324 'write':[usr and/or grp names]}
326 :param public: (bool)
328 self._assert_container()
331 block_info = (blocksize, blockhash, size, nblocks) =\
332 self._get_file_block_info(f, size)
333 (hashes, hmap, offset) = ([], {}, 0)
334 if content_type is None:
335 content_type = 'application/octet-stream'
337 self._caclulate_uploaded_blocks(*block_info,
343 hashmap = dict(bytes=size, hashes=hashes)
344 missing = self._get_missing_hashes(obj, hashmap,
345 content_type=content_type,
348 content_encoding=content_encoding,
349 content_disposition=content_disposition,
357 upload_gen = upload_cb(len(missing))
358 for i in range(len(missing), len(hashmap['hashes']) + 1):
369 sendlog.info('%s blocks missing' % len(missing))
370 num_of_blocks = len(missing)
371 missing = self._upload_missing_blocks(
377 if num_of_blocks == len(missing):
380 num_of_blocks = len(missing)
385 '%s blocks failed to upload' % len(missing),
387 except KeyboardInterrupt:
388 sendlog.info('- - - wait for threads to finish')
389 for thread in activethreads():
397 content_type=content_type,
402 # download_* auxiliary methods
403 def _get_remote_blocks_info(self, obj, **restargs):
404 #retrieve object hashmap
405 myrange = restargs.pop('data_range', None)
406 hashmap = self.get_object_hashmap(obj, **restargs)
407 restargs['data_range'] = myrange
408 blocksize = int(hashmap['block_size'])
409 blockhash = hashmap['block_hash']
410 total_size = hashmap['bytes']
411 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
413 for i, h in enumerate(hashmap['hashes']):
415 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
417 def _dump_blocks_sync(self,
425 for blockid, blockhash in enumerate(remote_hashes):
426 if blockhash == None:
428 start = blocksize * blockid
429 end = total_size - 1 if start + blocksize > total_size\
430 else start + blocksize - 1
431 (start, end) = _range_up(start, end, range)
432 restargs['data_range'] = 'bytes=%s-%s' % (start, end)
433 r = self.object_get(obj, success=(200, 206), **restargs)
438 def _get_block_async(self, obj, **restargs):
439 event = SilentEvent(self.object_get,
446 def _hash_from_file(self, fp, start, size, blockhash):
448 block = fp.read(size)
449 h = newhashlib(blockhash)
450 h.update(block.strip('\x00'))
451 return hexlify(h.digest())
453 def _thread2file(self,
458 """write the results of a greenleted rest call to a file
459 @offset: the offset of the file up to blocksize
460 - e.g. if the range is 10-100, all
461 blocks will be written to normal_position - 10"""
463 for i, (start, g) in enumerate(flying.items()):
467 block = g.value.content
468 local_file.seek(start - offset)
469 local_file.write(block)
471 finished.append(flying.pop(start))
475 def _dump_blocks_async(self,
486 file_size = fstat(local_file.fileno()).st_size if resume else 0
490 if filerange is not None:
491 rstart = int(filerange.split('-')[0])
492 offset = rstart if blocksize > rstart else rstart % blocksize
494 self._init_thread_limit()
495 for block_hash, blockid in remote_hashes.items():
496 start = blocksize * blockid
497 if start < file_size\
498 and block_hash == self._hash_from_file(
505 self._watch_thread_limit(flying.values())
506 finished += self._thread2file(
511 end = total_size - 1 if start + blocksize > total_size\
512 else start + blocksize - 1
513 (start, end) = _range_up(start, end, filerange)
517 restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
518 flying[start] = self._get_block_async(obj, **restargs)
520 for thread in flying.values():
522 finished += self._thread2file(flying, local_file, offset, **restargs)
524 def download_object(self,
533 if_modified_since=None,
534 if_unmodified_since=None):
535 """Download an object using multiple connections (threads) and
536 writing to random parts of the file
538 :param obj: (str) remote object path
540 :param dst: open file descriptor (wb+)
542 :param download_cb: optional progress.bar object for downloading
544 :param version: (str) file version
546 :param resume: (bool) if set, preserve already downloaded file parts
548 :param range: (str) from-to where from and to are integers denoting
549 file positions in bytes
551 :param if_match: (str)
553 :param if_none_match: (str)
555 :param if_modified_since: (str) formated date
557 :param if_unmodified_since: (str) formated date
560 restargs = dict(version=version,
561 data_range=None if range is None else 'bytes=%s' % range,
563 if_none_match=if_none_match,
564 if_modified_since=if_modified_since,
565 if_unmodified_since=if_unmodified_since)
571 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
572 assert total_size >= 0
575 self.progress_bar_gen = download_cb(len(remote_hashes))
579 self._dump_blocks_sync(obj,
587 self._dump_blocks_async(obj,
597 dst.truncate(total_size)
601 #Command Progress Bar method
603 if hasattr(self, 'progress_bar_gen'):
605 self.progress_bar_gen.next()
609 def _complete_cb(self):
612 self.progress_bar_gen.next()
616 def get_object_hashmap(self, obj,
620 if_modified_since=None,
621 if_unmodified_since=None,
624 :param obj: (str) remote object path
626 :param if_match: (str)
628 :param if_none_match: (str)
630 :param if_modified_since: (str) formated date
632 :param if_unmodified_since: (str) formated date
634 :param data_range: (str) from-to where from and to are integers
635 denoting file positions in bytes
640 r = self.object_get(obj,
643 if_etag_match=if_match,
644 if_etag_not_match=if_none_match,
645 if_modified_since=if_modified_since,
646 if_unmodified_since=if_unmodified_since,
647 data_range=data_range)
648 except ClientError as err:
649 if err.status == 304 or err.status == 412:
654 def set_account_group(self, group, usernames):
658 :param usernames: (list)
660 r = self.account_post(update=True, groups={group: usernames})
663 def del_account_group(self, group):
667 r = self.account_post(update=True, groups={group: []})
670 def get_account_info(self, until=None):
672 :param until: (str) formated date
676 r = self.account_head(until=until)
677 if r.status_code == 401:
678 raise ClientError("No authorization")
681 def get_account_quota(self):
685 return filter_in(self.get_account_info(),
686 'X-Account-Policy-Quota',
689 def get_account_versioning(self):
693 return filter_in(self.get_account_info(),
694 'X-Account-Policy-Versioning',
697 def get_account_meta(self, until=None):
699 :meta until: (str) formated date
703 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
705 def get_account_group(self):
709 return filter_in(self.get_account_info(), 'X-Account-Group-')
711 def set_account_meta(self, metapairs):
713 :param metapairs: (dict) {key1:val1, key2:val2, ...}
715 assert(type(metapairs) is dict)
716 r = self.account_post(update=True, metadata=metapairs)
719 def del_account_meta(self, metakey):
721 :param metakey: (str) metadatum key
723 r = self.account_post(update=True, metadata={metakey: ''})
726 def set_account_quota(self, quota):
730 r = self.account_post(update=True, quota=quota)
733 def set_account_versioning(self, versioning):
735 "param versioning: (str)
737 r = self.account_post(update=True, versioning=versioning)
740 def list_containers(self):
744 r = self.account_get()
747 def del_container(self, until=None, delimiter=None):
749 :param until: (str) formated date
751 :param delimiter: (str) with / empty container
753 :raises ClientError: 404 Container does not exist
755 :raises ClientError: 409 Container is not empty
757 self._assert_container()
758 r = self.container_delete(until=until,
760 success=(204, 404, 409))
762 if r.status_code == 404:
763 raise ClientError('Container "%s" does not exist' % self.container,
765 elif r.status_code == 409:
766 raise ClientError('Container "%s" is not empty' % self.container,
769 def get_container_versioning(self, container):
771 :param container: (str)
775 self.container = container
776 return filter_in(self.get_container_info(),
777 'X-Container-Policy-Versioning')
779 def get_container_quota(self, container):
781 :param container: (str)
785 self.container = container
786 return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
788 def get_container_info(self, until=None):
790 :param until: (str) formated date
794 :raises ClientError: 404 Container not found
797 r = self.container_head(until=until)
798 except ClientError as err:
799 err.details.append('for container %s' % self.container)
803 def get_container_meta(self, until=None):
805 :param until: (str) formated date
809 return filter_in(self.get_container_info(until=until),
812 def get_container_object_meta(self, until=None):
814 :param until: (str) formated date
818 return filter_in(self.get_container_info(until=until),
819 'X-Container-Object-Meta')
821 def set_container_meta(self, metapairs):
823 :param metapairs: (dict) {key1:val1, key2:val2, ...}
825 assert(type(metapairs) is dict)
826 r = self.container_post(update=True, metadata=metapairs)
829 def del_container_meta(self, metakey):
831 :param metakey: (str) metadatum key
833 r = self.container_post(update=True, metadata={metakey: ''})
836 def set_container_quota(self, quota):
840 r = self.container_post(update=True, quota=quota)
843 def set_container_versioning(self, versioning):
845 :param versioning: (str)
847 r = self.container_post(update=True, versioning=versioning)
850 def del_object(self, obj, until=None, delimiter=None):
852 :param obj: (str) remote object path
854 :param until: (str) formated date
856 :param delimiter: (str)
858 self._assert_container()
859 r = self.object_delete(obj, until=until, delimiter=delimiter)
862 def set_object_meta(self, obj, metapairs):
864 :param obj: (str) remote object path
866 :param metapairs: (dict) {key1:val1, key2:val2, ...}
868 assert(type(metapairs) is dict)
869 r = self.object_post(obj, update=True, metadata=metapairs)
872 def del_object_meta(self, obj, metakey):
874 :param obj: (str) remote object path
876 :param metakey: (str) metadatum key
878 r = self.object_post(obj, update=True, metadata={metakey: ''})
881 def publish_object(self, obj):
883 :param obj: (str) remote object path
885 :returns: (str) access url
887 r = self.object_post(obj, update=True, public=True)
889 info = self.get_object_info(obj)
890 pref, sep, rest = self.base_url.partition('//')
891 base = rest.split('/')[0]
892 newurl = path4url('%s%s%s' % (pref, sep, base),
893 info['x-object-public'])
896 def unpublish_object(self, obj):
898 :param obj: (str) remote object path
900 r = self.object_post(obj, update=True, public=False)
903 def get_object_info(self, obj, version=None):
905 :param obj: (str) remote object path
907 :param version: (str)
912 r = self.object_head(obj, version=version)
914 except ClientError as ce:
916 raise ClientError('Object not found', status=404)
919 def get_object_meta(self, obj, version=None):
921 :param obj: (str) remote object path
923 :param version: (str)
927 return filter_in(self.get_object_info(obj, version=version),
930 def get_object_sharing(self, obj):
932 :param obj: (str) remote object path
936 r = filter_in(self.get_object_info(obj),
941 perms = r['x-object-sharing'].split(';')
946 raise ClientError('Incorrect reply format')
947 (key, val) = perm.strip().split('=')
951 def set_object_sharing(self, obj,
952 read_permition=False,
953 write_permition=False):
954 """Give read/write permisions to an object.
956 :param obj: (str) remote object path
958 :param read_permition: (list - bool) users and user groups that get
959 read permition for this object - False means all previous read
960 permissions will be removed
962 :param write_perimition: (list - bool) of users and user groups to get
963 write permition for this object - False means all previous write
964 permissions will be removed
967 perms = dict(read='' if not read_permition else read_permition,
968 write='' if not write_permition else write_permition)
969 r = self.object_post(obj, update=True, permissions=perms)
972 def del_object_sharing(self, obj):
974 :param obj: (str) remote object path
976 self.set_object_sharing(obj)
978 def append_object(self, obj, source_file, upload_cb=None):
980 :param obj: (str) remote object path
982 :param source_file: open file descriptor
984 :param upload_db: progress.bar for uploading
987 self._assert_container()
988 meta = self.get_container_info()
989 blocksize = int(meta['x-container-block-size'])
990 filesize = fstat(source_file.fileno()).st_size
991 nblocks = 1 + (filesize - 1) // blocksize
994 upload_gen = upload_cb(nblocks)
996 for i in range(nblocks):
997 block = source_file.read(min(blocksize, filesize - offset))
999 r = self.object_post(obj,
1001 content_range='bytes */*',
1002 content_type='application/octet-stream',
1003 content_length=len(block),
1010 def truncate_object(self, obj, upto_bytes):
1012 :param obj: (str) remote object path
1014 :param upto_bytes: max number of bytes to leave on file
1016 r = self.object_post(obj,
1018 content_range='bytes 0-%s/*' % upto_bytes,
1019 content_type='application/octet-stream',
1020 object_bytes=upto_bytes,
1021 source_object=path4url(self.container, obj))
1024 def overwrite_object(self,
1030 """Overwrite a part of an object from local source file
1032 :param obj: (str) remote object path
1034 :param start: (int) position in bytes to start overwriting from
1036 :param end: (int) position in bytes to stop overwriting at
1038 :param source_file: open file descriptor
1040 :param upload_db: progress.bar for uploading
1043 r = self.get_object_info(obj)
1044 rf_size = int(r['content-length'])
1045 if rf_size < int(start):
1047 'Range start exceeds file size',
1049 elif rf_size < int(end):
1051 'Range end exceeds file size',
1053 self._assert_container()
1054 meta = self.get_container_info()
1055 blocksize = int(meta['x-container-block-size'])
1056 filesize = fstat(source_file.fileno()).st_size
1057 datasize = int(end) - int(start) + 1
1058 nblocks = 1 + (datasize - 1) // blocksize
1061 upload_gen = upload_cb(nblocks)
1063 for i in range(nblocks):
1064 block = source_file.read(min(blocksize,
1067 r = self.object_post(obj,
1069 content_type='application/octet-stream',
1070 content_length=len(block),
1071 content_range='bytes %s-%s/*' % (
1073 start + offset + len(block) - 1),
1075 offset += len(block)
1081 def copy_object(self, src_container, src_object, dst_container,
1083 source_version=None,
1088 :param src_container: (str) source container
1090 :param src_object: (str) source object path
1092 :param dst_container: (str) destination container
1094 :param dst_object: (str) destination object path
1096 :param source_version: (str) source object version
1098 :param public: (bool)
1100 :param content_type: (str)
1102 :param delimiter: (str)
1104 self._assert_account()
1105 self.container = dst_container
1106 dst_object = dst_object or src_object
1107 src_path = path4url(src_container, src_object)
1108 r = self.object_put(dst_object,
1112 source_version=source_version,
1114 content_type=content_type,
1115 delimiter=delimiter)
1118 def move_object(self, src_container, src_object, dst_container,
1120 source_version=None,
1125 :param src_container: (str) source container
1127 :param src_object: (str) source object path
1129 :param dst_container: (str) destination container
1131 :param dst_object: (str) destination object path
1133 :param source_version: (str) source object version
1135 :param public: (bool)
1137 :param content_type: (str)
1139 :param delimiter: (str)
1141 self._assert_account()
1142 self.container = dst_container
1143 dst_object = dst_object or src_object
1144 src_path = path4url(src_container, src_object)
1145 r = self.object_put(dst_object,
1149 source_version=source_version,
1151 content_type=content_type,
1152 delimiter=delimiter)
1155 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1156 """Get accounts that share with self.account
1160 :param marker: (str)
1164 self._assert_account()
1166 self.set_param('format', 'json')
1167 self.set_param('limit', limit, iff=limit is not None)
1168 self.set_param('marker', marker, iff=marker is not None)
1171 success = kwargs.pop('success', (200, 204))
1172 r = self.get(path, *args, success=success, **kwargs)
1175 def get_object_versionlist(self, obj):
1177 :param obj: (str) remote object path
1181 self._assert_container()
1182 r = self.object_get(obj, format='json', version='list')
1183 return r.json['versions']