1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 self._assert_container()
122 data = json.dumps(json.loads(data))
124 raise ClientError('"%s" is not json-formated' % f.name, 1)
126 msg = '"%s" is not a valid hashmap file' % f.name
127 raise ClientError(msg, 1)
130 data = f.read(size) if size else f.read()
135 content_encoding=content_encoding,
136 content_disposition=content_disposition,
137 content_type=content_type,
142 def create_object_by_manifestation(
145 content_encoding=None,
146 content_disposition=None,
151 :param obj: (str) remote object path
155 :param content_encoding: (str)
157 :param content_disposition: (str)
159 :param content_type: (str)
161 :param sharing: {'read':[user and/or grp names],
162 'write':[usr and/or grp names]}
164 :param public: (bool)
166 self._assert_container()
171 content_encoding=content_encoding,
172 content_disposition=content_disposition,
173 content_type=content_type,
176 manifest='%s/%s' % (self.container, obj))
178 # upload_* auxiliary methods
179 def _put_block_async(self, data, hash, upload_gen=None):
180 event = SilentEvent(method=self._put_block, data=data, hash=hash)
184 def _put_block(self, data, hash):
185 r = self.container_post(
187 content_type='application/octet-stream',
188 content_length=len(data),
191 assert r.json[0] == hash, 'Local hash does not match server'
193 def _get_file_block_info(self, fileobj, size=None):
194 meta = self.get_container_info()
195 blocksize = int(meta['x-container-block-size'])
196 blockhash = meta['x-container-block-hash']
197 size = size if size is not None else fstat(fileobj.fileno()).st_size
198 nblocks = 1 + (size - 1) // blocksize
199 return (blocksize, blockhash, size, nblocks)
201 def _create_or_get_missing_hashes(
208 if_etag_not_match=None,
209 content_encoding=None,
210 content_disposition=None,
218 content_type=content_type,
220 if_etag_match=if_etag_match,
221 if_etag_not_match=if_etag_not_match,
222 content_encoding=content_encoding,
223 content_disposition=content_disposition,
224 permissions=permissions,
227 return None if r.status_code == 201 else r.json
229 def _calculate_blocks_for_upload(
230 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
234 hash_gen = hash_cb(nblocks)
237 for i in range(nblocks):
238 block = fileobj.read(min(blocksize, size - offset))
240 hash = _pithos_hash(block, blockhash)
242 hmap[hash] = (offset, bytes)
246 msg = 'Failed to calculate uploaded blocks:'
247 ' Offset and object size do not match'
248 assert offset == size, msg
250 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
251 """upload missing blocks asynchronously"""
253 self._init_thread_limit()
258 offset, bytes = hmap[hash]
260 data = fileobj.read(bytes)
261 r = self._put_block_async(data, hash, upload_gen)
263 unfinished = self._watch_thread_limit(flying)
264 for thread in set(flying).difference(unfinished):
266 failures.append(thread)
269 ClientError) and thread.exception.status == 502:
270 self.POOLSIZE = self._thread_limit
271 elif thread.isAlive():
272 flying.append(thread)
280 for thread in flying:
283 failures.append(thread)
290 return [failure.kwargs['hash'] for failure in failures]
300 content_encoding=None,
301 content_disposition=None,
305 """Upload an object using multiple connections (threads)
307 :param obj: (str) remote object path
309 :param f: open file descriptor (rb)
311 :param hash_cb: optional progress.bar object for calculating hashes
313 :param upload_cb: optional progress.bar object for uploading
317 :param if_etag_match: (str) Push that value to if-match header at file
320 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
321 it does not exist remotely, otherwise the operation will fail.
322 Involves the case of an object with the same path is created while
323 the object is being uploaded.
325 :param content_encoding: (str)
327 :param content_disposition: (str)
329 :param content_type: (str)
331 :param sharing: {'read':[user and/or grp names],
332 'write':[usr and/or grp names]}
334 :param public: (bool)
336 self._assert_container()
339 block_info = (blocksize, blockhash, size, nblocks) =\
340 self._get_file_block_info(f, size)
341 (hashes, hmap, offset) = ([], {}, 0)
343 content_type = 'application/octet-stream'
345 self._calculate_blocks_for_upload(
352 hashmap = dict(bytes=size, hashes=hashes)
353 missing = self.create_or_get_missing_hashes(
355 content_type=content_type,
357 if_etag_match=if_etag_match,
358 if_etag_not_match='*' if if_not_exist else None,
359 content_encoding=content_encoding,
360 content_disposition=content_disposition,
368 upload_gen = upload_cb(len(missing))
369 for i in range(len(missing), len(hashmap['hashes']) + 1):
380 sendlog.info('%s blocks missing' % len(missing))
381 num_of_blocks = len(missing)
382 missing = self._upload_missing_blocks(
388 if num_of_blocks == len(missing):
391 num_of_blocks = len(missing)
396 '%s blocks failed to upload' % len(missing),
398 except KeyboardInterrupt:
399 sendlog.info('- - - wait for threads to finish')
400 for thread in activethreads():
408 content_type=content_type,
409 if_etag_match=if_etag_match,
410 if_etag_not_match='*' if if_not_exist else None,
417 # download_* auxiliary methods
418 def _get_remote_blocks_info(self, obj, **restargs):
419 #retrieve object hashmap
420 myrange = restargs.pop('data_range', None)
421 hashmap = self.get_object_hashmap(obj, **restargs)
422 restargs['data_range'] = myrange
423 blocksize = int(hashmap['block_size'])
424 blockhash = hashmap['block_hash']
425 total_size = hashmap['bytes']
426 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
428 for i, h in enumerate(hashmap['hashes']):
429 # map_dict[h] = i CHAGE
431 map_dict[h].append(i)
434 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
436 def _dump_blocks_sync(
437 self, obj, remote_hashes, blocksize, total_size, dst, range,
439 for blockid, blockhash in enumerate(remote_hashes):
441 start = blocksize * blockid
442 is_last = start + blocksize > total_size
443 end = (total_size - 1) if is_last else (start + blocksize - 1)
444 (start, end) = _range_up(start, end, range)
445 args['data_range'] = 'bytes=%s-%s' % (start, end)
446 r = self.object_get(obj, success=(200, 206), **args)
451 def _get_block_async(self, obj, **args):
452 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
456 def _hash_from_file(self, fp, start, size, blockhash):
458 block = fp.read(size)
459 h = newhashlib(blockhash)
460 h.update(block.strip('\x00'))
461 return hexlify(h.digest())
463 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
464 """write the results of a greenleted rest call to a file
466 :param offset: the offset of the file up to blocksize
467 - e.g. if the range is 10-100, all blocks will be written to
470 for i, (key, g) in enumerate(flying.items()):
475 block = g.value.content
476 for block_start in blockids[key]:
477 local_file.seek(block_start + offset)
478 local_file.write(block)
484 def _dump_blocks_async(
485 self, obj, remote_hashes, blocksize, total_size, local_file,
486 blockhash=None, resume=False, filerange=None, **restargs):
487 file_size = fstat(local_file.fileno()).st_size if resume else 0
489 blockid_dict = dict()
491 if filerange is not None:
492 rstart = int(filerange.split('-')[0])
493 offset = rstart if blocksize > rstart else rstart % blocksize
495 self._init_thread_limit()
496 for block_hash, blockids in remote_hashes.items():
497 blockids = [blk * blocksize for blk in blockids]
498 unsaved = [blk for blk in blockids if not (
499 blk < file_size and block_hash == self._hash_from_file(
500 local_file, blk, blocksize, blockhash))]
501 self._cb_next(len(blockids) - len(unsaved))
504 self._watch_thread_limit(flying.values())
506 flying, blockid_dict, local_file, offset,
508 end = total_size - 1 if key + blocksize > total_size\
509 else key + blocksize - 1
510 start, end = _range_up(key, end, filerange)
514 restargs['async_headers'] = {
515 'Range': 'bytes=%s-%s' % (start, end)}
516 flying[key] = self._get_block_async(obj, **restargs)
517 blockid_dict[key] = unsaved
519 for thread in flying.values():
521 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
531 if_modified_since=None,
532 if_unmodified_since=None):
533 """Download an object (multiple connections, random blocks)
535 :param obj: (str) remote object path
537 :param dst: open file descriptor (wb+)
539 :param download_cb: optional progress.bar object for downloading
541 :param version: (str) file version
543 :param resume: (bool) if set, preserve already downloaded file parts
545 :param range_str: (str) from, to are file positions (int) in bytes
547 :param if_match: (str)
549 :param if_none_match: (str)
551 :param if_modified_since: (str) formated date
553 :param if_unmodified_since: (str) formated date"""
556 data_range=None if range_str is None else 'bytes=%s' % range_str,
558 if_none_match=if_none_match,
559 if_modified_since=if_modified_since,
560 if_unmodified_since=if_unmodified_since)
567 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
568 assert total_size >= 0
571 self.progress_bar_gen = download_cb(len(hash_list))
575 self._dump_blocks_sync(
584 self._dump_blocks_async(
595 dst.truncate(total_size)
599 #Command Progress Bar method
600 def _cb_next(self, step=1):
601 if hasattr(self, 'progress_bar_gen'):
603 for i in xrange(step):
604 self.progress_bar_gen.next()
608 def _complete_cb(self):
611 self.progress_bar_gen.next()
615 def get_object_hashmap(
620 if_modified_since=None,
621 if_unmodified_since=None,
624 :param obj: (str) remote object path
626 :param if_match: (str)
628 :param if_none_match: (str)
630 :param if_modified_since: (str) formated date
632 :param if_unmodified_since: (str) formated date
634 :param data_range: (str) from-to where from and to are integers
635 denoting file positions in bytes
644 if_etag_match=if_match,
645 if_etag_not_match=if_none_match,
646 if_modified_since=if_modified_since,
647 if_unmodified_since=if_unmodified_since,
648 data_range=data_range)
649 except ClientError as err:
650 if err.status == 304 or err.status == 412:
655 def set_account_group(self, group, usernames):
659 :param usernames: (list)
661 self.account_post(update=True, groups={group: usernames})
663 def del_account_group(self, group):
667 self.account_post(update=True, groups={group: []})
669 def get_account_info(self, until=None):
671 :param until: (str) formated date
675 r = self.account_head(until=until)
676 if r.status_code == 401:
677 raise ClientError("No authorization", status=401)
680 def get_account_quota(self):
685 self.get_account_info(),
686 'X-Account-Policy-Quota',
689 def get_account_versioning(self):
694 self.get_account_info(),
695 'X-Account-Policy-Versioning',
698 def get_account_meta(self, until=None):
700 :meta until: (str) formated date
704 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
706 def get_account_group(self):
710 return filter_in(self.get_account_info(), 'X-Account-Group-')
712 def set_account_meta(self, metapairs):
714 :param metapairs: (dict) {key1:val1, key2:val2, ...}
716 assert(type(metapairs) is dict)
717 self.account_post(update=True, metadata=metapairs)
719 def del_account_meta(self, metakey):
721 :param metakey: (str) metadatum key
723 self.account_post(update=True, metadata={metakey: ''})
726 def set_account_quota(self, quota):
730 self.account_post(update=True, quota=quota)
733 def set_account_versioning(self, versioning):
735 "param versioning: (str)
737 self.account_post(update=True, versioning=versioning)
739 def list_containers(self):
743 r = self.account_get()
746 def del_container(self, until=None, delimiter=None):
748 :param until: (str) formated date
750 :param delimiter: (str) with / empty container
752 :raises ClientError: 404 Container does not exist
754 :raises ClientError: 409 Container is not empty
756 self._assert_container()
757 r = self.container_delete(
760 success=(204, 404, 409))
761 if r.status_code == 404:
763 'Container "%s" does not exist' % self.container,
765 elif r.status_code == 409:
767 'Container "%s" is not empty' % self.container,
770 def get_container_versioning(self, container=None):
772 :param container: (str)
776 cnt_back_up = self.container
778 self.container = container or cnt_back_up
780 self.get_container_info(),
781 'X-Container-Policy-Versioning')
783 self.container = cnt_back_up
785 def get_container_limit(self, container=None):
787 :param container: (str)
791 cnt_back_up = self.container
793 self.container = container or cnt_back_up
795 self.get_container_info(),
796 'X-Container-Policy-Quota')
798 self.container = cnt_back_up
800 def get_container_info(self, until=None):
802 :param until: (str) formated date
806 :raises ClientError: 404 Container not found
809 r = self.container_head(until=until)
810 except ClientError as err:
811 err.details.append('for container %s' % self.container)
815 def get_container_meta(self, until=None):
817 :param until: (str) formated date
822 self.get_container_info(until=until),
825 def get_container_object_meta(self, until=None):
827 :param until: (str) formated date
832 self.get_container_info(until=until),
833 'X-Container-Object-Meta')
835 def set_container_meta(self, metapairs):
837 :param metapairs: (dict) {key1:val1, key2:val2, ...}
839 assert(type(metapairs) is dict)
840 self.container_post(update=True, metadata=metapairs)
842 def del_container_meta(self, metakey):
844 :param metakey: (str) metadatum key
846 self.container_post(update=True, metadata={metakey: ''})
848 def set_container_limit(self, limit):
852 self.container_post(update=True, quota=limit)
854 def set_container_versioning(self, versioning):
856 :param versioning: (str)
858 self.container_post(update=True, versioning=versioning)
860 def del_object(self, obj, until=None, delimiter=None):
862 :param obj: (str) remote object path
864 :param until: (str) formated date
866 :param delimiter: (str)
868 self._assert_container()
869 self.object_delete(obj, until=until, delimiter=delimiter)
871 def set_object_meta(self, obj, metapairs):
873 :param obj: (str) remote object path
875 :param metapairs: (dict) {key1:val1, key2:val2, ...}
877 assert(type(metapairs) is dict)
878 self.object_post(obj, update=True, metadata=metapairs)
880 def del_object_meta(self, obj, metakey):
882 :param obj: (str) remote object path
884 :param metakey: (str) metadatum key
886 self.object_post(obj, update=True, metadata={metakey: ''})
888 def publish_object(self, obj):
890 :param obj: (str) remote object path
892 :returns: (str) access url
894 self.object_post(obj, update=True, public=True)
895 info = self.get_object_info(obj)
896 pref, sep, rest = self.base_url.partition('//')
897 base = rest.split('/')[0]
898 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
900 def unpublish_object(self, obj):
902 :param obj: (str) remote object path
904 self.object_post(obj, update=True, public=False)
906 def get_object_info(self, obj, version=None):
908 :param obj: (str) remote object path
910 :param version: (str)
915 r = self.object_head(obj, version=version)
917 except ClientError as ce:
919 raise ClientError('Object %s not found' % obj, status=404)
922 def get_object_meta(self, obj, version=None):
924 :param obj: (str) remote object path
926 :param version: (str)
931 self.get_object_info(obj, version=version),
934 def get_object_sharing(self, obj):
936 :param obj: (str) remote object path
941 self.get_object_info(obj),
946 perms = r['x-object-sharing'].split(';')
951 raise ClientError('Incorrect reply format')
952 (key, val) = perm.strip().split('=')
956 def set_object_sharing(
958 read_permition=False, write_permition=False):
959 """Give read/write permisions to an object.
961 :param obj: (str) remote object path
963 :param read_permition: (list - bool) users and user groups that get
964 read permition for this object - False means all previous read
965 permissions will be removed
967 :param write_perimition: (list - bool) of users and user groups to get
968 write permition for this object - False means all previous write
969 permissions will be removed
972 perms = dict(read=read_permition or '', write=write_permition or '')
973 self.object_post(obj, update=True, permissions=perms)
975 def del_object_sharing(self, obj):
977 :param obj: (str) remote object path
979 self.set_object_sharing(obj)
981 def append_object(self, obj, source_file, upload_cb=None):
983 :param obj: (str) remote object path
985 :param source_file: open file descriptor
987 :param upload_db: progress.bar for uploading
990 self._assert_container()
991 meta = self.get_container_info()
992 blocksize = int(meta['x-container-block-size'])
993 filesize = fstat(source_file.fileno()).st_size
994 nblocks = 1 + (filesize - 1) // blocksize
997 upload_gen = upload_cb(nblocks)
999 for i in range(nblocks):
1000 block = source_file.read(min(blocksize, filesize - offset))
1001 offset += len(block)
1005 content_range='bytes */*',
1006 content_type='application/octet-stream',
1007 content_length=len(block),
1013 def truncate_object(self, obj, upto_bytes):
1015 :param obj: (str) remote object path
1017 :param upto_bytes: max number of bytes to leave on file
1022 content_range='bytes 0-%s/*' % upto_bytes,
1023 content_type='application/octet-stream',
1024 object_bytes=upto_bytes,
1025 source_object=path4url(self.container, obj))
1027 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1028 """Overwrite a part of an object from local source file
1030 :param obj: (str) remote object path
1032 :param start: (int) position in bytes to start overwriting from
1034 :param end: (int) position in bytes to stop overwriting at
1036 :param source_file: open file descriptor
1038 :param upload_db: progress.bar for uploading
1041 r = self.get_object_info(obj)
1042 rf_size = int(r['content-length'])
1043 if rf_size < int(start):
1045 'Range start exceeds file size',
1047 elif rf_size < int(end):
1049 'Range end exceeds file size',
1051 self._assert_container()
1052 meta = self.get_container_info()
1053 blocksize = int(meta['x-container-block-size'])
1054 filesize = fstat(source_file.fileno()).st_size
1055 datasize = int(end) - int(start) + 1
1056 nblocks = 1 + (datasize - 1) // blocksize
1059 upload_gen = upload_cb(nblocks)
1061 for i in range(nblocks):
1062 read_size = min(blocksize, filesize - offset, datasize - offset)
1063 block = source_file.read(read_size)
1067 content_type='application/octet-stream',
1068 content_length=len(block),
1069 content_range='bytes %s-%s/*' % (
1071 start + offset + len(block) - 1),
1073 offset += len(block)
1079 self, src_container, src_object, dst_container,
1081 source_version=None,
1082 source_account=None,
1087 :param src_container: (str) source container
1089 :param src_object: (str) source object path
1091 :param dst_container: (str) destination container
1093 :param dst_object: (str) destination object path
1095 :param source_version: (str) source object version
1097 :param source_account: (str) account to copy from
1099 :param public: (bool)
1101 :param content_type: (str)
1103 :param delimiter: (str)
1105 self._assert_account()
1106 self.container = dst_container
1107 src_path = path4url(src_container, src_object)
1109 dst_object or src_object,
1113 source_version=source_version,
1114 source_account=source_account,
1116 content_type=content_type,
1117 delimiter=delimiter)
1120 self, src_container, src_object, dst_container,
1122 source_account=None,
1123 source_version=None,
1128 :param src_container: (str) source container
1130 :param src_object: (str) source object path
1132 :param dst_container: (str) destination container
1134 :param dst_object: (str) destination object path
1136 :param source_account: (str) account to move from
1138 :param source_version: (str) source object version
1140 :param public: (bool)
1142 :param content_type: (str)
1144 :param delimiter: (str)
1146 self._assert_account()
1147 self.container = dst_container
1148 dst_object = dst_object or src_object
1149 src_path = path4url(src_container, src_object)
1155 source_account=source_account,
1156 source_version=source_version,
1158 content_type=content_type,
1159 delimiter=delimiter)
1161 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1162 """Get accounts that share with self.account
1166 :param marker: (str)
1170 self._assert_account()
1172 self.set_param('format', 'json')
1173 self.set_param('limit', limit, iff=limit is not None)
1174 self.set_param('marker', marker, iff=marker is not None)
1177 success = kwargs.pop('success', (200, 204))
1178 r = self.get(path, *args, success=success, **kwargs)
1181 def get_object_versionlist(self, obj):
1183 :param obj: (str) remote object path
1187 self._assert_container()
1188 r = self.object_get(obj, format='json', version='list')
1189 return r.json['versions']