1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """Synnefo Pithos+ API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 :returns: (dict) created object metadata
118 self._assert_container()
124 data = json.dumps(json.loads(data))
126 raise ClientError('"%s" is not json-formated' % f.name, 1)
128 msg = '"%s" is not a valid hashmap file' % f.name
129 raise ClientError(msg, 1)
132 data = f.read(size) if size else f.read()
137 content_encoding=content_encoding,
138 content_disposition=content_disposition,
139 content_type=content_type,
145 def create_object_by_manifestation(
148 content_encoding=None,
149 content_disposition=None,
154 :param obj: (str) remote object path
158 :param content_encoding: (str)
160 :param content_disposition: (str)
162 :param content_type: (str)
164 :param sharing: {'read':[user and/or grp names],
165 'write':[usr and/or grp names]}
167 :param public: (bool)
169 :returns: (dict) created object metadata
171 self._assert_container()
176 content_encoding=content_encoding,
177 content_disposition=content_disposition,
178 content_type=content_type,
181 manifest='%s/%s' % (self.container, obj))
184 # upload_* auxiliary methods
185 def _put_block_async(self, data, hash, upload_gen=None):
186 event = SilentEvent(method=self._put_block, data=data, hash=hash)
190 def _put_block(self, data, hash):
191 r = self.container_post(
193 content_type='application/octet-stream',
194 content_length=len(data),
197 assert r.json[0] == hash, 'Local hash does not match server'
199 def _get_file_block_info(self, fileobj, size=None):
200 meta = self.get_container_info()
201 blocksize = int(meta['x-container-block-size'])
202 blockhash = meta['x-container-block-hash']
203 size = size if size is not None else fstat(fileobj.fileno()).st_size
204 nblocks = 1 + (size - 1) // blocksize
205 return (blocksize, blockhash, size, nblocks)
207 def _create_or_get_missing_hashes(
214 if_etag_not_match=None,
215 content_encoding=None,
216 content_disposition=None,
224 content_type=content_type,
226 if_etag_match=if_etag_match,
227 if_etag_not_match=if_etag_not_match,
228 content_encoding=content_encoding,
229 content_disposition=content_disposition,
230 permissions=permissions,
233 return (None if r.status_code == 201 else r.json), r.headers
235 def _calculate_blocks_for_upload(
236 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
240 hash_gen = hash_cb(nblocks)
243 for i in range(nblocks):
244 block = fileobj.read(min(blocksize, size - offset))
246 hash = _pithos_hash(block, blockhash)
248 hmap[hash] = (offset, bytes)
252 msg = 'Failed to calculate uploaded blocks:'
253 ' Offset and object size do not match'
254 assert offset == size, msg
256 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
257 """upload missing blocks asynchronously"""
259 self._init_thread_limit()
264 offset, bytes = hmap[hash]
266 data = fileobj.read(bytes)
267 r = self._put_block_async(data, hash, upload_gen)
269 unfinished = self._watch_thread_limit(flying)
270 for thread in set(flying).difference(unfinished):
272 failures.append(thread)
275 ClientError) and thread.exception.status == 502:
276 self.POOLSIZE = self._thread_limit
277 elif thread.isAlive():
278 flying.append(thread)
286 for thread in flying:
289 failures.append(thread)
296 return [failure.kwargs['hash'] for failure in failures]
306 content_encoding=None,
307 content_disposition=None,
311 """Upload an object using multiple connections (threads)
313 :param obj: (str) remote object path
315 :param f: open file descriptor (rb)
317 :param hash_cb: optional progress.bar object for calculating hashes
319 :param upload_cb: optional progress.bar object for uploading
323 :param if_etag_match: (str) Push that value to if-match header at file
326 :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
327 it does not exist remotely, otherwise the operation will fail.
328 Involves the case of an object with the same path is created while
329 the object is being uploaded.
331 :param content_encoding: (str)
333 :param content_disposition: (str)
335 :param content_type: (str)
337 :param sharing: {'read':[user and/or grp names],
338 'write':[usr and/or grp names]}
340 :param public: (bool)
342 self._assert_container()
345 block_info = (blocksize, blockhash, size, nblocks) =\
346 self._get_file_block_info(f, size)
347 (hashes, hmap, offset) = ([], {}, 0)
349 content_type = 'application/octet-stream'
351 self._calculate_blocks_for_upload(
358 hashmap = dict(bytes=size, hashes=hashes)
359 missing, obj_headers = self._create_or_get_missing_hashes(
361 content_type=content_type,
363 if_etag_match=if_etag_match,
364 if_etag_not_match='*' if if_not_exist else None,
365 content_encoding=content_encoding,
366 content_disposition=content_disposition,
374 upload_gen = upload_cb(len(missing))
375 for i in range(len(missing), len(hashmap['hashes']) + 1):
386 sendlog.info('%s blocks missing' % len(missing))
387 num_of_blocks = len(missing)
388 missing = self._upload_missing_blocks(
394 if num_of_blocks == len(missing):
397 num_of_blocks = len(missing)
402 '%s blocks failed to upload' % len(missing),
404 except KeyboardInterrupt:
405 sendlog.info('- - - wait for threads to finish')
406 for thread in activethreads():
414 content_type=content_type,
415 if_etag_match=if_etag_match,
416 if_etag_not_match='*' if if_not_exist else None,
424 # download_* auxiliary methods
425 def _get_remote_blocks_info(self, obj, **restargs):
426 #retrieve object hashmap
427 myrange = restargs.pop('data_range', None)
428 hashmap = self.get_object_hashmap(obj, **restargs)
429 restargs['data_range'] = myrange
430 blocksize = int(hashmap['block_size'])
431 blockhash = hashmap['block_hash']
432 total_size = hashmap['bytes']
433 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
435 for i, h in enumerate(hashmap['hashes']):
436 # map_dict[h] = i CHAGE
438 map_dict[h].append(i)
441 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
443 def _dump_blocks_sync(
444 self, obj, remote_hashes, blocksize, total_size, dst, range,
446 for blockid, blockhash in enumerate(remote_hashes):
448 start = blocksize * blockid
449 is_last = start + blocksize > total_size
450 end = (total_size - 1) if is_last else (start + blocksize - 1)
451 (start, end) = _range_up(start, end, range)
452 args['data_range'] = 'bytes=%s-%s' % (start, end)
453 r = self.object_get(obj, success=(200, 206), **args)
458 def _get_block_async(self, obj, **args):
459 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
463 def _hash_from_file(self, fp, start, size, blockhash):
465 block = fp.read(size)
466 h = newhashlib(blockhash)
467 h.update(block.strip('\x00'))
468 return hexlify(h.digest())
470 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
471 """write the results of a greenleted rest call to a file
473 :param offset: the offset of the file up to blocksize
474 - e.g. if the range is 10-100, all blocks will be written to
477 for i, (key, g) in enumerate(flying.items()):
482 block = g.value.content
483 for block_start in blockids[key]:
484 local_file.seek(block_start + offset)
485 local_file.write(block)
491 def _dump_blocks_async(
492 self, obj, remote_hashes, blocksize, total_size, local_file,
493 blockhash=None, resume=False, filerange=None, **restargs):
494 file_size = fstat(local_file.fileno()).st_size if resume else 0
496 blockid_dict = dict()
498 if filerange is not None:
499 rstart = int(filerange.split('-')[0])
500 offset = rstart if blocksize > rstart else rstart % blocksize
502 self._init_thread_limit()
503 for block_hash, blockids in remote_hashes.items():
504 blockids = [blk * blocksize for blk in blockids]
505 unsaved = [blk for blk in blockids if not (
506 blk < file_size and block_hash == self._hash_from_file(
507 local_file, blk, blocksize, blockhash))]
508 self._cb_next(len(blockids) - len(unsaved))
511 self._watch_thread_limit(flying.values())
513 flying, blockid_dict, local_file, offset,
515 end = total_size - 1 if key + blocksize > total_size\
516 else key + blocksize - 1
517 start, end = _range_up(key, end, filerange)
521 restargs['async_headers'] = {
522 'Range': 'bytes=%s-%s' % (start, end)}
523 flying[key] = self._get_block_async(obj, **restargs)
524 blockid_dict[key] = unsaved
526 for thread in flying.values():
528 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
538 if_modified_since=None,
539 if_unmodified_since=None):
540 """Download an object (multiple connections, random blocks)
542 :param obj: (str) remote object path
544 :param dst: open file descriptor (wb+)
546 :param download_cb: optional progress.bar object for downloading
548 :param version: (str) file version
550 :param resume: (bool) if set, preserve already downloaded file parts
552 :param range_str: (str) from, to are file positions (int) in bytes
554 :param if_match: (str)
556 :param if_none_match: (str)
558 :param if_modified_since: (str) formated date
560 :param if_unmodified_since: (str) formated date"""
563 data_range=None if range_str is None else 'bytes=%s' % range_str,
565 if_none_match=if_none_match,
566 if_modified_since=if_modified_since,
567 if_unmodified_since=if_unmodified_since)
574 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
575 assert total_size >= 0
578 self.progress_bar_gen = download_cb(len(hash_list))
582 self._dump_blocks_sync(
591 self._dump_blocks_async(
602 dst.truncate(total_size)
606 #Command Progress Bar method
607 def _cb_next(self, step=1):
608 if hasattr(self, 'progress_bar_gen'):
610 for i in xrange(step):
611 self.progress_bar_gen.next()
615 def _complete_cb(self):
618 self.progress_bar_gen.next()
622 def get_object_hashmap(
627 if_modified_since=None,
628 if_unmodified_since=None,
631 :param obj: (str) remote object path
633 :param if_match: (str)
635 :param if_none_match: (str)
637 :param if_modified_since: (str) formated date
639 :param if_unmodified_since: (str) formated date
641 :param data_range: (str) from-to where from and to are integers
642 denoting file positions in bytes
651 if_etag_match=if_match,
652 if_etag_not_match=if_none_match,
653 if_modified_since=if_modified_since,
654 if_unmodified_since=if_unmodified_since,
655 data_range=data_range)
656 except ClientError as err:
657 if err.status == 304 or err.status == 412:
662 def set_account_group(self, group, usernames):
666 :param usernames: (list)
668 self.account_post(update=True, groups={group: usernames})
670 def del_account_group(self, group):
674 self.account_post(update=True, groups={group: []})
676 def get_account_info(self, until=None):
678 :param until: (str) formated date
682 r = self.account_head(until=until)
683 if r.status_code == 401:
684 raise ClientError("No authorization", status=401)
687 def get_account_quota(self):
692 self.get_account_info(),
693 'X-Account-Policy-Quota',
696 def get_account_versioning(self):
701 self.get_account_info(),
702 'X-Account-Policy-Versioning',
705 def get_account_meta(self, until=None):
707 :meta until: (str) formated date
711 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
713 def get_account_group(self):
717 return filter_in(self.get_account_info(), 'X-Account-Group-')
719 def set_account_meta(self, metapairs):
721 :param metapairs: (dict) {key1:val1, key2:val2, ...}
723 assert(type(metapairs) is dict)
724 self.account_post(update=True, metadata=metapairs)
726 def del_account_meta(self, metakey):
728 :param metakey: (str) metadatum key
730 self.account_post(update=True, metadata={metakey: ''})
733 def set_account_quota(self, quota):
737 self.account_post(update=True, quota=quota)
740 def set_account_versioning(self, versioning):
742 "param versioning: (str)
744 self.account_post(update=True, versioning=versioning)
746 def list_containers(self):
750 r = self.account_get()
753 def del_container(self, until=None, delimiter=None):
755 :param until: (str) formated date
757 :param delimiter: (str) with / empty container
759 :raises ClientError: 404 Container does not exist
761 :raises ClientError: 409 Container is not empty
763 self._assert_container()
764 r = self.container_delete(
767 success=(204, 404, 409))
768 if r.status_code == 404:
770 'Container "%s" does not exist' % self.container,
772 elif r.status_code == 409:
774 'Container "%s" is not empty' % self.container,
777 def get_container_versioning(self, container=None):
779 :param container: (str)
783 cnt_back_up = self.container
785 self.container = container or cnt_back_up
787 self.get_container_info(),
788 'X-Container-Policy-Versioning')
790 self.container = cnt_back_up
792 def get_container_limit(self, container=None):
794 :param container: (str)
798 cnt_back_up = self.container
800 self.container = container or cnt_back_up
802 self.get_container_info(),
803 'X-Container-Policy-Quota')
805 self.container = cnt_back_up
807 def get_container_info(self, until=None):
809 :param until: (str) formated date
813 :raises ClientError: 404 Container not found
816 r = self.container_head(until=until)
817 except ClientError as err:
818 err.details.append('for container %s' % self.container)
822 def get_container_meta(self, until=None):
824 :param until: (str) formated date
829 self.get_container_info(until=until),
832 def get_container_object_meta(self, until=None):
834 :param until: (str) formated date
839 self.get_container_info(until=until),
840 'X-Container-Object-Meta')
842 def set_container_meta(self, metapairs):
844 :param metapairs: (dict) {key1:val1, key2:val2, ...}
846 assert(type(metapairs) is dict)
847 self.container_post(update=True, metadata=metapairs)
849 def del_container_meta(self, metakey):
851 :param metakey: (str) metadatum key
853 self.container_post(update=True, metadata={metakey: ''})
855 def set_container_limit(self, limit):
859 self.container_post(update=True, quota=limit)
861 def set_container_versioning(self, versioning):
863 :param versioning: (str)
865 self.container_post(update=True, versioning=versioning)
867 def del_object(self, obj, until=None, delimiter=None):
869 :param obj: (str) remote object path
871 :param until: (str) formated date
873 :param delimiter: (str)
875 self._assert_container()
876 self.object_delete(obj, until=until, delimiter=delimiter)
878 def set_object_meta(self, obj, metapairs):
880 :param obj: (str) remote object path
882 :param metapairs: (dict) {key1:val1, key2:val2, ...}
884 assert(type(metapairs) is dict)
885 self.object_post(obj, update=True, metadata=metapairs)
887 def del_object_meta(self, obj, metakey):
889 :param obj: (str) remote object path
891 :param metakey: (str) metadatum key
893 self.object_post(obj, update=True, metadata={metakey: ''})
895 def publish_object(self, obj):
897 :param obj: (str) remote object path
899 :returns: (str) access url
901 self.object_post(obj, update=True, public=True)
902 info = self.get_object_info(obj)
903 pref, sep, rest = self.base_url.partition('//')
904 base = rest.split('/')[0]
905 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
907 def unpublish_object(self, obj):
909 :param obj: (str) remote object path
911 self.object_post(obj, update=True, public=False)
913 def get_object_info(self, obj, version=None):
915 :param obj: (str) remote object path
917 :param version: (str)
922 r = self.object_head(obj, version=version)
924 except ClientError as ce:
926 raise ClientError('Object %s not found' % obj, status=404)
929 def get_object_meta(self, obj, version=None):
931 :param obj: (str) remote object path
933 :param version: (str)
938 self.get_object_info(obj, version=version),
941 def get_object_sharing(self, obj):
943 :param obj: (str) remote object path
948 self.get_object_info(obj),
953 perms = r['x-object-sharing'].split(';')
958 raise ClientError('Incorrect reply format')
959 (key, val) = perm.strip().split('=')
963 def set_object_sharing(
965 read_permition=False, write_permition=False):
966 """Give read/write permisions to an object.
968 :param obj: (str) remote object path
970 :param read_permition: (list - bool) users and user groups that get
971 read permition for this object - False means all previous read
972 permissions will be removed
974 :param write_perimition: (list - bool) of users and user groups to get
975 write permition for this object - False means all previous write
976 permissions will be removed
979 perms = dict(read=read_permition or '', write=write_permition or '')
980 self.object_post(obj, update=True, permissions=perms)
982 def del_object_sharing(self, obj):
984 :param obj: (str) remote object path
986 self.set_object_sharing(obj)
988 def append_object(self, obj, source_file, upload_cb=None):
990 :param obj: (str) remote object path
992 :param source_file: open file descriptor
994 :param upload_db: progress.bar for uploading
997 self._assert_container()
998 meta = self.get_container_info()
999 blocksize = int(meta['x-container-block-size'])
1000 filesize = fstat(source_file.fileno()).st_size
1001 nblocks = 1 + (filesize - 1) // blocksize
1004 upload_gen = upload_cb(nblocks)
1006 for i in range(nblocks):
1007 block = source_file.read(min(blocksize, filesize - offset))
1008 offset += len(block)
1012 content_range='bytes */*',
1013 content_type='application/octet-stream',
1014 content_length=len(block),
1020 def truncate_object(self, obj, upto_bytes):
1022 :param obj: (str) remote object path
1024 :param upto_bytes: max number of bytes to leave on file
1029 content_range='bytes 0-%s/*' % upto_bytes,
1030 content_type='application/octet-stream',
1031 object_bytes=upto_bytes,
1032 source_object=path4url(self.container, obj))
1034 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1035 """Overwrite a part of an object from local source file
1037 :param obj: (str) remote object path
1039 :param start: (int) position in bytes to start overwriting from
1041 :param end: (int) position in bytes to stop overwriting at
1043 :param source_file: open file descriptor
1045 :param upload_db: progress.bar for uploading
1048 r = self.get_object_info(obj)
1049 rf_size = int(r['content-length'])
1050 if rf_size < int(start):
1052 'Range start exceeds file size',
1054 elif rf_size < int(end):
1056 'Range end exceeds file size',
1058 self._assert_container()
1059 meta = self.get_container_info()
1060 blocksize = int(meta['x-container-block-size'])
1061 filesize = fstat(source_file.fileno()).st_size
1062 datasize = int(end) - int(start) + 1
1063 nblocks = 1 + (datasize - 1) // blocksize
1066 upload_gen = upload_cb(nblocks)
1068 for i in range(nblocks):
1069 read_size = min(blocksize, filesize - offset, datasize - offset)
1070 block = source_file.read(read_size)
1074 content_type='application/octet-stream',
1075 content_length=len(block),
1076 content_range='bytes %s-%s/*' % (
1078 start + offset + len(block) - 1),
1080 offset += len(block)
1086 self, src_container, src_object, dst_container,
1088 source_version=None,
1089 source_account=None,
1094 :param src_container: (str) source container
1096 :param src_object: (str) source object path
1098 :param dst_container: (str) destination container
1100 :param dst_object: (str) destination object path
1102 :param source_version: (str) source object version
1104 :param source_account: (str) account to copy from
1106 :param public: (bool)
1108 :param content_type: (str)
1110 :param delimiter: (str)
1112 self._assert_account()
1113 self.container = dst_container
1114 src_path = path4url(src_container, src_object)
1116 dst_object or src_object,
1120 source_version=source_version,
1121 source_account=source_account,
1123 content_type=content_type,
1124 delimiter=delimiter)
1127 self, src_container, src_object, dst_container,
1129 source_account=None,
1130 source_version=None,
1135 :param src_container: (str) source container
1137 :param src_object: (str) source object path
1139 :param dst_container: (str) destination container
1141 :param dst_object: (str) destination object path
1143 :param source_account: (str) account to move from
1145 :param source_version: (str) source object version
1147 :param public: (bool)
1149 :param content_type: (str)
1151 :param delimiter: (str)
1153 self._assert_account()
1154 self.container = dst_container
1155 dst_object = dst_object or src_object
1156 src_path = path4url(src_container, src_object)
1162 source_account=source_account,
1163 source_version=source_version,
1165 content_type=content_type,
1166 delimiter=delimiter)
1168 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1169 """Get accounts that share with self.account
1173 :param marker: (str)
1177 self._assert_account()
1179 self.set_param('format', 'json')
1180 self.set_param('limit', limit, iff=limit is not None)
1181 self.set_param('marker', marker, iff=marker is not None)
1184 success = kwargs.pop('success', (200, 204))
1185 r = self.get(path, *args, success=success, **kwargs)
1188 def get_object_versionlist(self, obj):
1190 :param obj: (str) remote object path
1194 self._assert_container()
1195 r = self.object_get(obj, format='json', version='list')
1196 return r.json['versions']