1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 :returns: (dict) created object metadata
118 self._assert_container()
124 data = json.dumps(json.loads(data))
126 raise ClientError('"%s" is not json-formated' % f.name, 1)
128 msg = '"%s" is not a valid hashmap file' % f.name
129 raise ClientError(msg, 1)
132 data = f.read(size) if size else f.read()
137 content_encoding=content_encoding,
138 content_disposition=content_disposition,
139 content_type=content_type,
145 def create_object_by_manifestation(
148 content_encoding=None,
149 content_disposition=None,
154 :param obj: (str) remote object path
158 :param content_encoding: (str)
160 :param content_disposition: (str)
162 :param content_type: (str)
164 :param sharing: {'read':[user and/or grp names],
165 'write':[usr and/or grp names]}
167 :param public: (bool)
169 :returns: (dict) created object metadata
171 self._assert_container()
176 content_encoding=content_encoding,
177 content_disposition=content_disposition,
178 content_type=content_type,
181 manifest='%s/%s' % (self.container, obj))
184 # upload_* auxiliary methods
185 def _put_block_async(self, data, hash, upload_gen=None):
186 event = SilentEvent(method=self._put_block, data=data, hash=hash)
190 def _put_block(self, data, hash):
191 r = self.container_post(
193 content_type='application/octet-stream',
194 content_length=len(data),
197 assert r.json[0] == hash, 'Local hash does not match server'
199 def _get_file_block_info(self, fileobj, size=None, cache=None):
201 :param fileobj: (file descriptor) source
203 :param size: (int) size of data to upload from source
205 :param cache: (dict) if provided, cache container info response to
206 avoid redundant calls
208 if isinstance(cache, dict):
210 meta = cache[self.container]
212 meta = self.get_container_info()
213 cache[self.container] = meta
215 meta = self.get_container_info()
216 blocksize = int(meta['x-container-block-size'])
217 blockhash = meta['x-container-block-hash']
218 size = size if size is not None else fstat(fileobj.fileno()).st_size
219 nblocks = 1 + (size - 1) // blocksize
220 return (blocksize, blockhash, size, nblocks)
222 def _create_or_get_missing_hashes(
229 if_etag_not_match=None,
230 content_encoding=None,
231 content_disposition=None,
239 content_type=content_type,
241 if_etag_match=if_etag_match,
242 if_etag_not_match=if_etag_not_match,
243 content_encoding=content_encoding,
244 content_disposition=content_disposition,
245 permissions=permissions,
248 return (None if r.status_code == 201 else r.json), r.headers
250 def _calculate_blocks_for_upload(
251 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
255 hash_gen = hash_cb(nblocks)
258 for i in range(nblocks):
259 block = fileobj.read(min(blocksize, size - offset))
261 hash = _pithos_hash(block, blockhash)
263 hmap[hash] = (offset, bytes)
267 msg = 'Failed to calculate uploaded blocks:'
268 ' Offset and object size do not match'
269 assert offset == size, msg
271 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272 """upload missing blocks asynchronously"""
274 self._init_thread_limit()
279 offset, bytes = hmap[hash]
281 data = fileobj.read(bytes)
282 r = self._put_block_async(data, hash, upload_gen)
284 unfinished = self._watch_thread_limit(flying)
285 for thread in set(flying).difference(unfinished):
287 failures.append(thread)
290 ClientError) and thread.exception.status == 502:
291 self.POOLSIZE = self._thread_limit
292 elif thread.isAlive():
293 flying.append(thread)
301 for thread in flying:
304 failures.append(thread)
311 return [failure.kwargs['hash'] for failure in failures]
321 content_encoding=None,
322 content_disposition=None,
326 container_info_cache=None):
327 """Upload an object using multiple connections (threads)
329 :param obj: (str) remote object path
331 :param f: open file descriptor (rb)
333 :param hash_cb: optional progress.bar object for calculating hashes
335 :param upload_cb: optional progress.bar object for uploading
339 :param if_etag_match: (str) Push that value to if-match header at file
342 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343 it does not exist remotely, otherwise the operation will fail.
344 Involves the case of an object with the same path is created while
345 the object is being uploaded.
347 :param content_encoding: (str)
349 :param content_disposition: (str)
351 :param content_type: (str)
353 :param sharing: {'read':[user and/or grp names],
354 'write':[usr and/or grp names]}
356 :param public: (bool)
358 :param container_info_cache: (dict) if given, avoid redundant calls to
359 server for container info (block size and hash information)
361 self._assert_container()
364 block_info = (blocksize, blockhash, size, nblocks) =\
365 self._get_file_block_info(f, size, container_info_cache)
366 (hashes, hmap, offset) = ([], {}, 0)
368 content_type = 'application/octet-stream'
370 self._calculate_blocks_for_upload(
377 hashmap = dict(bytes=size, hashes=hashes)
378 missing, obj_headers = self._create_or_get_missing_hashes(
380 content_type=content_type,
382 if_etag_match=if_etag_match,
383 if_etag_not_match='*' if if_not_exist else None,
384 content_encoding=content_encoding,
385 content_disposition=content_disposition,
393 upload_gen = upload_cb(len(missing))
394 for i in range(len(missing), len(hashmap['hashes']) + 1):
405 sendlog.info('%s blocks missing' % len(missing))
406 num_of_blocks = len(missing)
407 missing = self._upload_missing_blocks(
413 if num_of_blocks == len(missing):
416 num_of_blocks = len(missing)
421 '%s blocks failed to upload' % len(missing),
423 except KeyboardInterrupt:
424 sendlog.info('- - - wait for threads to finish')
425 for thread in activethreads():
433 content_type=content_type,
434 if_etag_match=if_etag_match,
435 if_etag_not_match='*' if if_not_exist else None,
443 # download_* auxiliary methods
444 def _get_remote_blocks_info(self, obj, **restargs):
445 #retrieve object hashmap
446 myrange = restargs.pop('data_range', None)
447 hashmap = self.get_object_hashmap(obj, **restargs)
448 restargs['data_range'] = myrange
449 blocksize = int(hashmap['block_size'])
450 blockhash = hashmap['block_hash']
451 total_size = hashmap['bytes']
452 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
454 for i, h in enumerate(hashmap['hashes']):
455 # map_dict[h] = i CHAGE
457 map_dict[h].append(i)
460 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
462 def _dump_blocks_sync(
463 self, obj, remote_hashes, blocksize, total_size, dst, range,
465 for blockid, blockhash in enumerate(remote_hashes):
467 start = blocksize * blockid
468 is_last = start + blocksize > total_size
469 end = (total_size - 1) if is_last else (start + blocksize - 1)
470 (start, end) = _range_up(start, end, range)
471 args['data_range'] = 'bytes=%s-%s' % (start, end)
472 r = self.object_get(obj, success=(200, 206), **args)
477 def _get_block_async(self, obj, **args):
478 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
482 def _hash_from_file(self, fp, start, size, blockhash):
484 block = fp.read(size)
485 h = newhashlib(blockhash)
486 h.update(block.strip('\x00'))
487 return hexlify(h.digest())
489 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
490 """write the results of a greenleted rest call to a file
492 :param offset: the offset of the file up to blocksize
493 - e.g. if the range is 10-100, all blocks will be written to
496 for i, (key, g) in enumerate(flying.items()):
501 block = g.value.content
502 for block_start in blockids[key]:
503 local_file.seek(block_start + offset)
504 local_file.write(block)
510 def _dump_blocks_async(
511 self, obj, remote_hashes, blocksize, total_size, local_file,
512 blockhash=None, resume=False, filerange=None, **restargs):
513 file_size = fstat(local_file.fileno()).st_size if resume else 0
515 blockid_dict = dict()
517 if filerange is not None:
518 rstart = int(filerange.split('-')[0])
519 offset = rstart if blocksize > rstart else rstart % blocksize
521 self._init_thread_limit()
522 for block_hash, blockids in remote_hashes.items():
523 blockids = [blk * blocksize for blk in blockids]
524 unsaved = [blk for blk in blockids if not (
525 blk < file_size and block_hash == self._hash_from_file(
526 local_file, blk, blocksize, blockhash))]
527 self._cb_next(len(blockids) - len(unsaved))
530 self._watch_thread_limit(flying.values())
532 flying, blockid_dict, local_file, offset,
534 end = total_size - 1 if key + blocksize > total_size\
535 else key + blocksize - 1
536 start, end = _range_up(key, end, filerange)
540 restargs['async_headers'] = {
541 'Range': 'bytes=%s-%s' % (start, end)}
542 flying[key] = self._get_block_async(obj, **restargs)
543 blockid_dict[key] = unsaved
545 for thread in flying.values():
547 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
557 if_modified_since=None,
558 if_unmodified_since=None):
559 """Download an object (multiple connections, random blocks)
561 :param obj: (str) remote object path
563 :param dst: open file descriptor (wb+)
565 :param download_cb: optional progress.bar object for downloading
567 :param version: (str) file version
569 :param resume: (bool) if set, preserve already downloaded file parts
571 :param range_str: (str) from, to are file positions (int) in bytes
573 :param if_match: (str)
575 :param if_none_match: (str)
577 :param if_modified_since: (str) formated date
579 :param if_unmodified_since: (str) formated date"""
582 data_range=None if range_str is None else 'bytes=%s' % range_str,
584 if_none_match=if_none_match,
585 if_modified_since=if_modified_since,
586 if_unmodified_since=if_unmodified_since)
593 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
594 assert total_size >= 0
597 self.progress_bar_gen = download_cb(len(hash_list))
601 self._dump_blocks_sync(
610 self._dump_blocks_async(
621 dst.truncate(total_size)
625 #Command Progress Bar method
626 def _cb_next(self, step=1):
627 if hasattr(self, 'progress_bar_gen'):
629 for i in xrange(step):
630 self.progress_bar_gen.next()
634 def _complete_cb(self):
637 self.progress_bar_gen.next()
641 def get_object_hashmap(
646 if_modified_since=None,
647 if_unmodified_since=None,
650 :param obj: (str) remote object path
652 :param if_match: (str)
654 :param if_none_match: (str)
656 :param if_modified_since: (str) formated date
658 :param if_unmodified_since: (str) formated date
660 :param data_range: (str) from-to where from and to are integers
661 denoting file positions in bytes
670 if_etag_match=if_match,
671 if_etag_not_match=if_none_match,
672 if_modified_since=if_modified_since,
673 if_unmodified_since=if_unmodified_since,
674 data_range=data_range)
675 except ClientError as err:
676 if err.status == 304 or err.status == 412:
681 def set_account_group(self, group, usernames):
685 :param usernames: (list)
687 self.account_post(update=True, groups={group: usernames})
689 def del_account_group(self, group):
693 self.account_post(update=True, groups={group: []})
695 def get_account_info(self, until=None):
697 :param until: (str) formated date
701 r = self.account_head(until=until)
702 if r.status_code == 401:
703 raise ClientError("No authorization", status=401)
706 def get_account_quota(self):
711 self.get_account_info(),
712 'X-Account-Policy-Quota',
715 def get_account_versioning(self):
720 self.get_account_info(),
721 'X-Account-Policy-Versioning',
724 def get_account_meta(self, until=None):
726 :meta until: (str) formated date
730 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
732 def get_account_group(self):
736 return filter_in(self.get_account_info(), 'X-Account-Group-')
738 def set_account_meta(self, metapairs):
740 :param metapairs: (dict) {key1:val1, key2:val2, ...}
742 assert(type(metapairs) is dict)
743 self.account_post(update=True, metadata=metapairs)
745 def del_account_meta(self, metakey):
747 :param metakey: (str) metadatum key
749 self.account_post(update=True, metadata={metakey: ''})
752 def set_account_quota(self, quota):
756 self.account_post(update=True, quota=quota)
759 def set_account_versioning(self, versioning):
761 "param versioning: (str)
763 self.account_post(update=True, versioning=versioning)
765 def list_containers(self):
769 r = self.account_get()
772 def del_container(self, until=None, delimiter=None):
774 :param until: (str) formated date
776 :param delimiter: (str) with / empty container
778 :raises ClientError: 404 Container does not exist
780 :raises ClientError: 409 Container is not empty
782 self._assert_container()
783 r = self.container_delete(
786 success=(204, 404, 409))
787 if r.status_code == 404:
789 'Container "%s" does not exist' % self.container,
791 elif r.status_code == 409:
793 'Container "%s" is not empty' % self.container,
796 def get_container_versioning(self, container=None):
798 :param container: (str)
802 cnt_back_up = self.container
804 self.container = container or cnt_back_up
806 self.get_container_info(),
807 'X-Container-Policy-Versioning')
809 self.container = cnt_back_up
811 def get_container_limit(self, container=None):
813 :param container: (str)
817 cnt_back_up = self.container
819 self.container = container or cnt_back_up
821 self.get_container_info(),
822 'X-Container-Policy-Quota')
824 self.container = cnt_back_up
826 def get_container_info(self, until=None):
828 :param until: (str) formated date
832 :raises ClientError: 404 Container not found
835 r = self.container_head(until=until)
836 except ClientError as err:
837 err.details.append('for container %s' % self.container)
841 def get_container_meta(self, until=None):
843 :param until: (str) formated date
848 self.get_container_info(until=until),
851 def get_container_object_meta(self, until=None):
853 :param until: (str) formated date
858 self.get_container_info(until=until),
859 'X-Container-Object-Meta')
861 def set_container_meta(self, metapairs):
863 :param metapairs: (dict) {key1:val1, key2:val2, ...}
865 assert(type(metapairs) is dict)
866 self.container_post(update=True, metadata=metapairs)
868 def del_container_meta(self, metakey):
870 :param metakey: (str) metadatum key
872 self.container_post(update=True, metadata={metakey: ''})
874 def set_container_limit(self, limit):
878 self.container_post(update=True, quota=limit)
880 def set_container_versioning(self, versioning):
882 :param versioning: (str)
884 self.container_post(update=True, versioning=versioning)
886 def del_object(self, obj, until=None, delimiter=None):
888 :param obj: (str) remote object path
890 :param until: (str) formated date
892 :param delimiter: (str)
894 self._assert_container()
895 self.object_delete(obj, until=until, delimiter=delimiter)
897 def set_object_meta(self, obj, metapairs):
899 :param obj: (str) remote object path
901 :param metapairs: (dict) {key1:val1, key2:val2, ...}
903 assert(type(metapairs) is dict)
904 self.object_post(obj, update=True, metadata=metapairs)
906 def del_object_meta(self, obj, metakey):
908 :param obj: (str) remote object path
910 :param metakey: (str) metadatum key
912 self.object_post(obj, update=True, metadata={metakey: ''})
914 def publish_object(self, obj):
916 :param obj: (str) remote object path
918 :returns: (str) access url
920 self.object_post(obj, update=True, public=True)
921 info = self.get_object_info(obj)
922 pref, sep, rest = self.base_url.partition('//')
923 base = rest.split('/')[0]
924 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
926 def unpublish_object(self, obj):
928 :param obj: (str) remote object path
930 self.object_post(obj, update=True, public=False)
932 def get_object_info(self, obj, version=None):
934 :param obj: (str) remote object path
936 :param version: (str)
941 r = self.object_head(obj, version=version)
943 except ClientError as ce:
945 raise ClientError('Object %s not found' % obj, status=404)
948 def get_object_meta(self, obj, version=None):
950 :param obj: (str) remote object path
952 :param version: (str)
957 self.get_object_info(obj, version=version),
960 def get_object_sharing(self, obj):
962 :param obj: (str) remote object path
967 self.get_object_info(obj),
972 perms = r['x-object-sharing'].split(';')
977 raise ClientError('Incorrect reply format')
978 (key, val) = perm.strip().split('=')
982 def set_object_sharing(
984 read_permition=False, write_permition=False):
985 """Give read/write permisions to an object.
987 :param obj: (str) remote object path
989 :param read_permition: (list - bool) users and user groups that get
990 read permition for this object - False means all previous read
991 permissions will be removed
993 :param write_perimition: (list - bool) of users and user groups to get
994 write permition for this object - False means all previous write
995 permissions will be removed
998 perms = dict(read=read_permition or '', write=write_permition or '')
999 self.object_post(obj, update=True, permissions=perms)
1001 def del_object_sharing(self, obj):
1003 :param obj: (str) remote object path
1005 self.set_object_sharing(obj)
1007 def append_object(self, obj, source_file, upload_cb=None):
1009 :param obj: (str) remote object path
1011 :param source_file: open file descriptor
1013 :param upload_db: progress.bar for uploading
1016 self._assert_container()
1017 meta = self.get_container_info()
1018 blocksize = int(meta['x-container-block-size'])
1019 filesize = fstat(source_file.fileno()).st_size
1020 nblocks = 1 + (filesize - 1) // blocksize
1023 upload_gen = upload_cb(nblocks)
1025 for i in range(nblocks):
1026 block = source_file.read(min(blocksize, filesize - offset))
1027 offset += len(block)
1031 content_range='bytes */*',
1032 content_type='application/octet-stream',
1033 content_length=len(block),
1039 def truncate_object(self, obj, upto_bytes):
1041 :param obj: (str) remote object path
1043 :param upto_bytes: max number of bytes to leave on file
1048 content_range='bytes 0-%s/*' % upto_bytes,
1049 content_type='application/octet-stream',
1050 object_bytes=upto_bytes,
1051 source_object=path4url(self.container, obj))
1053 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1054 """Overwrite a part of an object from local source file
1056 :param obj: (str) remote object path
1058 :param start: (int) position in bytes to start overwriting from
1060 :param end: (int) position in bytes to stop overwriting at
1062 :param source_file: open file descriptor
1064 :param upload_db: progress.bar for uploading
1067 r = self.get_object_info(obj)
1068 rf_size = int(r['content-length'])
1069 if rf_size < int(start):
1071 'Range start exceeds file size',
1073 elif rf_size < int(end):
1075 'Range end exceeds file size',
1077 self._assert_container()
1078 meta = self.get_container_info()
1079 blocksize = int(meta['x-container-block-size'])
1080 filesize = fstat(source_file.fileno()).st_size
1081 datasize = int(end) - int(start) + 1
1082 nblocks = 1 + (datasize - 1) // blocksize
1085 upload_gen = upload_cb(nblocks)
1087 for i in range(nblocks):
1088 read_size = min(blocksize, filesize - offset, datasize - offset)
1089 block = source_file.read(read_size)
1093 content_type='application/octet-stream',
1094 content_length=len(block),
1095 content_range='bytes %s-%s/*' % (
1097 start + offset + len(block) - 1),
1099 offset += len(block)
1105 self, src_container, src_object, dst_container,
1107 source_version=None,
1108 source_account=None,
1113 :param src_container: (str) source container
1115 :param src_object: (str) source object path
1117 :param dst_container: (str) destination container
1119 :param dst_object: (str) destination object path
1121 :param source_version: (str) source object version
1123 :param source_account: (str) account to copy from
1125 :param public: (bool)
1127 :param content_type: (str)
1129 :param delimiter: (str)
1131 self._assert_account()
1132 self.container = dst_container
1133 src_path = path4url(src_container, src_object)
1135 dst_object or src_object,
1139 source_version=source_version,
1140 source_account=source_account,
1142 content_type=content_type,
1143 delimiter=delimiter)
1146 self, src_container, src_object, dst_container,
1148 source_account=None,
1149 source_version=None,
1154 :param src_container: (str) source container
1156 :param src_object: (str) source object path
1158 :param dst_container: (str) destination container
1160 :param dst_object: (str) destination object path
1162 :param source_account: (str) account to move from
1164 :param source_version: (str) source object version
1166 :param public: (bool)
1168 :param content_type: (str)
1170 :param delimiter: (str)
1172 self._assert_account()
1173 self.container = dst_container
1174 dst_object = dst_object or src_object
1175 src_path = path4url(src_container, src_object)
1181 source_account=source_account,
1182 source_version=source_version,
1184 content_type=content_type,
1185 delimiter=delimiter)
1187 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1188 """Get accounts that share with self.account
1192 :param marker: (str)
1196 self._assert_account()
1198 self.set_param('format', 'json')
1199 self.set_param('limit', limit, iff=limit is not None)
1200 self.set_param('marker', marker, iff=marker is not None)
1203 success = kwargs.pop('success', (200, 204))
1204 r = self.get(path, *args, success=success, **kwargs)
1207 def get_object_versionlist(self, obj):
1209 :param obj: (str) remote object path
1213 self._assert_container()
1214 r = self.object_get(obj, format='json', version='list')
1215 return r.json['versions']