1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """GRNet Pithos API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 self._assert_container()
122 data = json.dumps(json.loads(data))
124 raise ClientError('"%s" is not json-formated' % f.name, 1)
126 msg = '"%s" is not a valid hashmap file' % f.name
127 raise ClientError(msg, 1)
130 data = f.read(size) if size else f.read()
135 content_encoding=content_encoding,
136 content_disposition=content_disposition,
137 content_type=content_type,
142 def create_object_by_manifestation(
145 content_encoding=None,
146 content_disposition=None,
151 :param obj: (str) remote object path
155 :param content_encoding: (str)
157 :param content_disposition: (str)
159 :param content_type: (str)
161 :param sharing: {'read':[user and/or grp names],
162 'write':[usr and/or grp names]}
164 :param public: (bool)
166 self._assert_container()
171 content_encoding=content_encoding,
172 content_disposition=content_disposition,
173 content_type=content_type,
176 manifest='%s/%s' % (self.container, obj))
178 # upload_* auxiliary methods
179 def _put_block_async(self, data, hash, upload_gen=None):
180 event = SilentEvent(method=self._put_block, data=data, hash=hash)
184 def _put_block(self, data, hash):
185 r = self.container_post(
187 content_type='application/octet-stream',
188 content_length=len(data),
191 assert r.json[0] == hash, 'Local hash does not match server'
193 def _get_file_block_info(self, fileobj, size=None):
194 meta = self.get_container_info()
195 blocksize = int(meta['x-container-block-size'])
196 blockhash = meta['x-container-block-hash']
197 size = size if size is not None else fstat(fileobj.fileno()).st_size
198 nblocks = 1 + (size - 1) // blocksize
199 return (blocksize, blockhash, size, nblocks)
201 def _get_missing_hashes(
208 content_encoding=None,
209 content_disposition=None,
217 content_type=content_type,
220 content_encoding=content_encoding,
221 content_disposition=content_disposition,
222 permissions=permissions,
225 return None if r.status_code == 201 else r.json
227 def _culculate_blocks_for_upload(
228 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
232 hash_gen = hash_cb(nblocks)
235 for i in range(nblocks):
236 block = fileobj.read(min(blocksize, size - offset))
238 hash = _pithos_hash(block, blockhash)
240 hmap[hash] = (offset, bytes)
244 msg = 'Failed to calculate uploaded blocks:'
245 ' Offset and object size do not match'
246 assert offset == size, msg
248 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249 """upload missing blocks asynchronously"""
251 self._init_thread_limit()
256 offset, bytes = hmap[hash]
258 data = fileobj.read(bytes)
259 r = self._put_block_async(data, hash, upload_gen)
261 unfinished = self._watch_thread_limit(flying)
262 for thread in set(flying).difference(unfinished):
264 failures.append(thread)
267 ClientError) and thread.exception.status == 502:
268 self.POOLSIZE = self._thread_limit
269 elif thread.isAlive():
270 flying.append(thread)
278 for thread in flying:
281 failures.append(thread)
288 return [failure.kwargs['hash'] for failure in failures]
296 content_encoding=None,
297 content_disposition=None,
301 """Upload an object using multiple connections (threads)
303 :param obj: (str) remote object path
305 :param f: open file descriptor (rb)
307 :param hash_cb: optional progress.bar object for calculating hashes
309 :param upload_cb: optional progress.bar object for uploading
313 :param content_encoding: (str)
315 :param content_disposition: (str)
317 :param content_type: (str)
319 :param sharing: {'read':[user and/or grp names],
320 'write':[usr and/or grp names]}
322 :param public: (bool)
324 self._assert_container()
327 block_info = (blocksize, blockhash, size, nblocks) =\
328 self._get_file_block_info(f, size)
329 (hashes, hmap, offset) = ([], {}, 0)
331 content_type = 'application/octet-stream'
333 self._culculate_blocks_for_upload(
340 hashmap = dict(bytes=size, hashes=hashes)
341 missing = self._get_missing_hashes(
343 content_type=content_type,
346 content_encoding=content_encoding,
347 content_disposition=content_disposition,
355 upload_gen = upload_cb(len(missing))
356 for i in range(len(missing), len(hashmap['hashes']) + 1):
367 sendlog.info('%s blocks missing' % len(missing))
368 num_of_blocks = len(missing)
369 missing = self._upload_missing_blocks(
375 if num_of_blocks == len(missing):
378 num_of_blocks = len(missing)
383 '%s blocks failed to upload' % len(missing),
385 except KeyboardInterrupt:
386 sendlog.info('- - - wait for threads to finish')
387 for thread in activethreads():
395 content_type=content_type,
399 # download_* auxiliary methods
400 def _get_remote_blocks_info(self, obj, **restargs):
401 #retrieve object hashmap
402 myrange = restargs.pop('data_range', None)
403 hashmap = self.get_object_hashmap(obj, **restargs)
404 restargs['data_range'] = myrange
405 blocksize = int(hashmap['block_size'])
406 blockhash = hashmap['block_hash']
407 total_size = hashmap['bytes']
408 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
410 for i, h in enumerate(hashmap['hashes']):
411 # map_dict[h] = i CHAGE
413 map_dict[h].append(i)
416 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
418 def _dump_blocks_sync(
419 self, obj, remote_hashes, blocksize, total_size, dst, range,
421 for blockid, blockhash in enumerate(remote_hashes):
423 start = blocksize * blockid
424 is_last = start + blocksize > total_size
425 end = (total_size - 1) if is_last else (start + blocksize - 1)
426 (start, end) = _range_up(start, end, range)
427 args['data_range'] = 'bytes=%s-%s' % (start, end)
428 r = self.object_get(obj, success=(200, 206), **args)
433 def _get_block_async(self, obj, **args):
434 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
438 def _hash_from_file(self, fp, start, size, blockhash):
440 block = fp.read(size)
441 h = newhashlib(blockhash)
442 h.update(block.strip('\x00'))
443 return hexlify(h.digest())
445 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
446 """write the results of a greenleted rest call to a file
448 :param offset: the offset of the file up to blocksize
449 - e.g. if the range is 10-100, all blocks will be written to
452 for i, (key, g) in enumerate(flying.items()):
457 block = g.value.content
458 for block_start in blockids[key]:
459 local_file.seek(block_start + offset)
460 local_file.write(block)
466 def _dump_blocks_async(
467 self, obj, remote_hashes, blocksize, total_size, local_file,
468 blockhash=None, resume=False, filerange=None, **restargs):
469 file_size = fstat(local_file.fileno()).st_size if resume else 0
471 blockid_dict = dict()
473 if filerange is not None:
474 rstart = int(filerange.split('-')[0])
475 offset = rstart if blocksize > rstart else rstart % blocksize
477 self._init_thread_limit()
478 for block_hash, blockids in remote_hashes.items():
479 blockids = [blk * blocksize for blk in blockids]
480 unsaved = [blk for blk in blockids if not (
481 blk < file_size and block_hash == self._hash_from_file(
482 local_file, blk, blocksize, blockhash))]
483 self._cb_next(len(blockids) - len(unsaved))
486 self._watch_thread_limit(flying.values())
488 flying, blockid_dict, local_file, offset,
490 end = total_size - 1 if key + blocksize > total_size\
491 else key + blocksize - 1
492 start, end = _range_up(key, end, filerange)
496 restargs['async_headers'] = {
497 'Range': 'bytes=%s-%s' % (start, end)}
498 flying[key] = self._get_block_async(obj, **restargs)
499 blockid_dict[key] = unsaved
501 for thread in flying.values():
503 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
513 if_modified_since=None,
514 if_unmodified_since=None):
515 """Download an object (multiple connections, random blocks)
517 :param obj: (str) remote object path
519 :param dst: open file descriptor (wb+)
521 :param download_cb: optional progress.bar object for downloading
523 :param version: (str) file version
525 :param resume: (bool) if set, preserve already downloaded file parts
527 :param range_str: (str) from, to are file positions (int) in bytes
529 :param if_match: (str)
531 :param if_none_match: (str)
533 :param if_modified_since: (str) formated date
535 :param if_unmodified_since: (str) formated date"""
538 data_range=None if range_str is None else 'bytes=%s' % range_str,
540 if_none_match=if_none_match,
541 if_modified_since=if_modified_since,
542 if_unmodified_since=if_unmodified_since)
549 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
550 assert total_size >= 0
553 self.progress_bar_gen = download_cb(len(hash_list))
557 self._dump_blocks_sync(
566 self._dump_blocks_async(
577 dst.truncate(total_size)
581 #Command Progress Bar method
582 def _cb_next(self, step=1):
583 if hasattr(self, 'progress_bar_gen'):
585 self.progress_bar_gen.next(step)
589 def _complete_cb(self):
592 self.progress_bar_gen.next(step)
596 def get_object_hashmap(
601 if_modified_since=None,
602 if_unmodified_since=None,
605 :param obj: (str) remote object path
607 :param if_match: (str)
609 :param if_none_match: (str)
611 :param if_modified_since: (str) formated date
613 :param if_unmodified_since: (str) formated date
615 :param data_range: (str) from-to where from and to are integers
616 denoting file positions in bytes
625 if_etag_match=if_match,
626 if_etag_not_match=if_none_match,
627 if_modified_since=if_modified_since,
628 if_unmodified_since=if_unmodified_since,
629 data_range=data_range)
630 except ClientError as err:
631 if err.status == 304 or err.status == 412:
636 def set_account_group(self, group, usernames):
640 :param usernames: (list)
642 self.account_post(update=True, groups={group: usernames})
644 def del_account_group(self, group):
648 self.account_post(update=True, groups={group: []})
650 def get_account_info(self, until=None):
652 :param until: (str) formated date
656 r = self.account_head(until=until)
657 if r.status_code == 401:
658 raise ClientError("No authorization", status=401)
661 def get_account_quota(self):
666 self.get_account_info(),
667 'X-Account-Policy-Quota',
670 def get_account_versioning(self):
675 self.get_account_info(),
676 'X-Account-Policy-Versioning',
679 def get_account_meta(self, until=None):
681 :meta until: (str) formated date
685 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
687 def get_account_group(self):
691 return filter_in(self.get_account_info(), 'X-Account-Group-')
693 def set_account_meta(self, metapairs):
695 :param metapairs: (dict) {key1:val1, key2:val2, ...}
697 assert(type(metapairs) is dict)
698 self.account_post(update=True, metadata=metapairs)
700 def del_account_meta(self, metakey):
702 :param metakey: (str) metadatum key
704 self.account_post(update=True, metadata={metakey: ''})
706 def set_account_quota(self, quota):
710 self.account_post(update=True, quota=quota)
712 def set_account_versioning(self, versioning):
714 "param versioning: (str)
716 self.account_post(update=True, versioning=versioning)
718 def list_containers(self):
722 r = self.account_get()
725 def del_container(self, until=None, delimiter=None):
727 :param until: (str) formated date
729 :param delimiter: (str) with / empty container
731 :raises ClientError: 404 Container does not exist
733 :raises ClientError: 409 Container is not empty
735 self._assert_container()
736 r = self.container_delete(
739 success=(204, 404, 409))
740 if r.status_code == 404:
742 'Container "%s" does not exist' % self.container,
744 elif r.status_code == 409:
746 'Container "%s" is not empty' % self.container,
749 def get_container_versioning(self, container=None):
751 :param container: (str)
755 cnt_back_up = self.container
757 self.container = container or cnt_back_up
759 self.get_container_info(),
760 'X-Container-Policy-Versioning')
762 self.container = cnt_back_up
764 def get_container_quota(self, container=None):
766 :param container: (str)
770 cnt_back_up = self.container
772 self.container = container or cnt_back_up
774 self.get_container_info(),
775 'X-Container-Policy-Quota')
777 self.container = cnt_back_up
779 def get_container_info(self, until=None):
781 :param until: (str) formated date
785 :raises ClientError: 404 Container not found
788 r = self.container_head(until=until)
789 except ClientError as err:
790 err.details.append('for container %s' % self.container)
794 def get_container_meta(self, until=None):
796 :param until: (str) formated date
801 self.get_container_info(until=until),
804 def get_container_object_meta(self, until=None):
806 :param until: (str) formated date
811 self.get_container_info(until=until),
812 'X-Container-Object-Meta')
814 def set_container_meta(self, metapairs):
816 :param metapairs: (dict) {key1:val1, key2:val2, ...}
818 assert(type(metapairs) is dict)
819 self.container_post(update=True, metadata=metapairs)
821 def del_container_meta(self, metakey):
823 :param metakey: (str) metadatum key
825 self.container_post(update=True, metadata={metakey: ''})
827 def set_container_quota(self, quota):
831 self.container_post(update=True, quota=quota)
833 def set_container_versioning(self, versioning):
835 :param versioning: (str)
837 self.container_post(update=True, versioning=versioning)
839 def del_object(self, obj, until=None, delimiter=None):
841 :param obj: (str) remote object path
843 :param until: (str) formated date
845 :param delimiter: (str)
847 self._assert_container()
848 self.object_delete(obj, until=until, delimiter=delimiter)
850 def set_object_meta(self, obj, metapairs):
852 :param obj: (str) remote object path
854 :param metapairs: (dict) {key1:val1, key2:val2, ...}
856 assert(type(metapairs) is dict)
857 self.object_post(obj, update=True, metadata=metapairs)
859 def del_object_meta(self, obj, metakey):
861 :param obj: (str) remote object path
863 :param metakey: (str) metadatum key
865 self.object_post(obj, update=True, metadata={metakey: ''})
867 def publish_object(self, obj):
869 :param obj: (str) remote object path
871 :returns: (str) access url
873 self.object_post(obj, update=True, public=True)
874 info = self.get_object_info(obj)
875 pref, sep, rest = self.base_url.partition('//')
876 base = rest.split('/')[0]
877 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
879 def unpublish_object(self, obj):
881 :param obj: (str) remote object path
883 self.object_post(obj, update=True, public=False)
885 def get_object_info(self, obj, version=None):
887 :param obj: (str) remote object path
889 :param version: (str)
894 r = self.object_head(obj, version=version)
896 except ClientError as ce:
898 raise ClientError('Object %s not found' % obj, status=404)
901 def get_object_meta(self, obj, version=None):
903 :param obj: (str) remote object path
905 :param version: (str)
910 self.get_object_info(obj, version=version),
913 def get_object_sharing(self, obj):
915 :param obj: (str) remote object path
920 self.get_object_info(obj),
925 perms = r['x-object-sharing'].split(';')
930 raise ClientError('Incorrect reply format')
931 (key, val) = perm.strip().split('=')
935 def set_object_sharing(
937 read_permition=False, write_permition=False):
938 """Give read/write permisions to an object.
940 :param obj: (str) remote object path
942 :param read_permition: (list - bool) users and user groups that get
943 read permition for this object - False means all previous read
944 permissions will be removed
946 :param write_perimition: (list - bool) of users and user groups to get
947 write permition for this object - False means all previous write
948 permissions will be removed
951 perms = dict(read=read_permition or '', write=write_permition or '')
952 self.object_post(obj, update=True, permissions=perms)
954 def del_object_sharing(self, obj):
956 :param obj: (str) remote object path
958 self.set_object_sharing(obj)
960 def append_object(self, obj, source_file, upload_cb=None):
962 :param obj: (str) remote object path
964 :param source_file: open file descriptor
966 :param upload_db: progress.bar for uploading
969 self._assert_container()
970 meta = self.get_container_info()
971 blocksize = int(meta['x-container-block-size'])
972 filesize = fstat(source_file.fileno()).st_size
973 nblocks = 1 + (filesize - 1) // blocksize
976 upload_gen = upload_cb(nblocks)
978 for i in range(nblocks):
979 block = source_file.read(min(blocksize, filesize - offset))
984 content_range='bytes */*',
985 content_type='application/octet-stream',
986 content_length=len(block),
992 def truncate_object(self, obj, upto_bytes):
994 :param obj: (str) remote object path
996 :param upto_bytes: max number of bytes to leave on file
1001 content_range='bytes 0-%s/*' % upto_bytes,
1002 content_type='application/octet-stream',
1003 object_bytes=upto_bytes,
1004 source_object=path4url(self.container, obj))
1006 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1007 """Overwrite a part of an object from local source file
1009 :param obj: (str) remote object path
1011 :param start: (int) position in bytes to start overwriting from
1013 :param end: (int) position in bytes to stop overwriting at
1015 :param source_file: open file descriptor
1017 :param upload_db: progress.bar for uploading
1020 r = self.get_object_info(obj)
1021 rf_size = int(r['content-length'])
1022 if rf_size < int(start):
1024 'Range start exceeds file size',
1026 elif rf_size < int(end):
1028 'Range end exceeds file size',
1030 self._assert_container()
1031 meta = self.get_container_info()
1032 blocksize = int(meta['x-container-block-size'])
1033 filesize = fstat(source_file.fileno()).st_size
1034 datasize = int(end) - int(start) + 1
1035 nblocks = 1 + (datasize - 1) // blocksize
1038 upload_gen = upload_cb(nblocks)
1040 for i in range(nblocks):
1041 read_size = min(blocksize, filesize - offset, datasize - offset)
1042 block = source_file.read(read_size)
1046 content_type='application/octet-stream',
1047 content_length=len(block),
1048 content_range='bytes %s-%s/*' % (
1050 start + offset + len(block) - 1),
1052 offset += len(block)
1058 self, src_container, src_object, dst_container,
1060 source_version=None,
1061 source_account=None,
1066 :param src_container: (str) source container
1068 :param src_object: (str) source object path
1070 :param dst_container: (str) destination container
1072 :param dst_object: (str) destination object path
1074 :param source_version: (str) source object version
1076 :param source_account: (str) account to copy from
1078 :param public: (bool)
1080 :param content_type: (str)
1082 :param delimiter: (str)
1084 self._assert_account()
1085 self.container = dst_container
1086 src_path = path4url(src_container, src_object)
1088 dst_object or src_object,
1092 source_version=source_version,
1093 source_account=source_account,
1095 content_type=content_type,
1096 delimiter=delimiter)
1099 self, src_container, src_object, dst_container,
1101 source_account=None,
1102 source_version=None,
1107 :param src_container: (str) source container
1109 :param src_object: (str) source object path
1111 :param dst_container: (str) destination container
1113 :param dst_object: (str) destination object path
1115 :param source_account: (str) account to move from
1117 :param source_version: (str) source object version
1119 :param public: (bool)
1121 :param content_type: (str)
1123 :param delimiter: (str)
1125 self._assert_account()
1126 self.container = dst_container
1127 dst_object = dst_object or src_object
1128 src_path = path4url(src_container, src_object)
1134 source_account=source_account,
1135 source_version=source_version,
1137 content_type=content_type,
1138 delimiter=delimiter)
1140 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1141 """Get accounts that share with self.account
1145 :param marker: (str)
1149 self._assert_account()
1151 self.set_param('format', 'json')
1152 self.set_param('limit', limit, iff=limit is not None)
1153 self.set_param('marker', marker, iff=marker is not None)
1156 success = kwargs.pop('success', (200, 204))
1157 r = self.get(path, *args, success=success, **kwargs)
1160 def get_object_versionlist(self, obj):
1162 :param obj: (str) remote object path
1166 self._assert_container()
1167 r = self.object_get(obj, format='json', version='list')
1168 return r.json['versions']