1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
34 from threading import enumerate as activethreads
37 from hashlib import new as newhashlib
40 from binascii import hexlify
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
49 def _pithos_hash(block, blockhash):
50 h = newhashlib(blockhash)
51 h.update(block.rstrip('\x00'))
55 def _range_up(start, end, a_range):
57 (rstart, rend) = a_range.split('-')
58 (rstart, rend) = (int(rstart), int(rend))
59 if rstart > end or rend < start:
68 class PithosClient(PithosRestClient):
69 """GRNet Pithos API client"""
71 def __init__(self, base_url, token, account=None, container=None):
72 super(PithosClient, self).__init__(base_url, token, account, container)
74 def purge_container(self, container=None):
75 """Delete an empty container and destroy associated blocks
77 cnt_back_up = self.container
79 self.container = container or cnt_back_up
80 self.container_delete(until=unicode(time()))
82 self.container = cnt_back_up
84 def upload_object_unchunked(
89 content_encoding=None,
90 content_disposition=None,
95 :param obj: (str) remote object path
97 :param f: open file descriptor
99 :param withHashFile: (bool)
101 :param size: (int) size of data to upload
105 :param content_encoding: (str)
107 :param content_disposition: (str)
109 :param content_type: (str)
111 :param sharing: {'read':[user and/or grp names],
112 'write':[usr and/or grp names]}
114 :param public: (bool)
116 self._assert_container()
122 data = json.dumps(json.loads(data))
124 raise ClientError('"%s" is not json-formated' % f.name, 1)
126 msg = '"%s" is not a valid hashmap file' % f.name
127 raise ClientError(msg, 1)
130 data = f.read(size) if size else f.read()
135 content_encoding=content_encoding,
136 content_disposition=content_disposition,
137 content_type=content_type,
142 def create_object_by_manifestation(
145 content_encoding=None,
146 content_disposition=None,
151 :param obj: (str) remote object path
155 :param content_encoding: (str)
157 :param content_disposition: (str)
159 :param content_type: (str)
161 :param sharing: {'read':[user and/or grp names],
162 'write':[usr and/or grp names]}
164 :param public: (bool)
166 self._assert_container()
171 content_encoding=content_encoding,
172 content_disposition=content_disposition,
173 content_type=content_type,
176 manifest='%s/%s' % (self.container, obj))
178 # upload_* auxiliary methods
179 def _put_block_async(self, data, hash, upload_gen=None):
180 event = SilentEvent(method=self._put_block, data=data, hash=hash)
184 def _put_block(self, data, hash):
185 r = self.container_post(
187 content_type='application/octet-stream',
188 content_length=len(data),
191 assert r.json[0] == hash, 'Local hash does not match server'
193 def _get_file_block_info(self, fileobj, size=None):
194 meta = self.get_container_info()
195 blocksize = int(meta['x-container-block-size'])
196 blockhash = meta['x-container-block-hash']
197 size = size if size is not None else fstat(fileobj.fileno()).st_size
198 nblocks = 1 + (size - 1) // blocksize
199 return (blocksize, blockhash, size, nblocks)
201 def _get_missing_hashes(
208 content_encoding=None,
209 content_disposition=None,
217 content_type=content_type,
220 content_encoding=content_encoding,
221 content_disposition=content_disposition,
222 permissions=permissions,
225 return None if r.status_code == 201 else r.json
227 def _culculate_blocks_for_upload(
228 self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
232 hash_gen = hash_cb(nblocks)
235 for i in range(nblocks):
236 block = fileobj.read(min(blocksize, size - offset))
238 hash = _pithos_hash(block, blockhash)
240 hmap[hash] = (offset, bytes)
244 msg = 'Failed to calculate uploaded blocks:'
245 ' Offset and object size do not match'
246 assert offset == size, msg
248 def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249 """upload missing blocks asynchronously"""
251 self._init_thread_limit()
256 offset, bytes = hmap[hash]
258 data = fileobj.read(bytes)
259 r = self._put_block_async(data, hash, upload_gen)
261 unfinished = self._watch_thread_limit(flying)
262 for thread in set(flying).difference(unfinished):
264 failures.append(thread)
267 ClientError) and thread.exception.status == 502:
268 self.POOLSIZE = self._thread_limit
269 elif thread.isAlive():
270 flying.append(thread)
278 for thread in flying:
281 failures.append(thread)
288 return [failure.kwargs['hash'] for failure in failures]
296 content_encoding=None,
297 content_disposition=None,
301 """Upload an object using multiple connections (threads)
303 :param obj: (str) remote object path
305 :param f: open file descriptor (rb)
307 :param hash_cb: optional progress.bar object for calculating hashes
309 :param upload_cb: optional progress.bar object for uploading
313 :param content_encoding: (str)
315 :param content_disposition: (str)
317 :param content_type: (str)
319 :param sharing: {'read':[user and/or grp names],
320 'write':[usr and/or grp names]}
322 :param public: (bool)
324 self._assert_container()
327 block_info = (blocksize, blockhash, size, nblocks) =\
328 self._get_file_block_info(f, size)
329 (hashes, hmap, offset) = ([], {}, 0)
331 content_type = 'application/octet-stream'
333 self._culculate_blocks_for_upload(
340 hashmap = dict(bytes=size, hashes=hashes)
341 missing = self._get_missing_hashes(
343 content_type=content_type,
346 content_encoding=content_encoding,
347 content_disposition=content_disposition,
355 upload_gen = upload_cb(len(missing))
356 for i in range(len(missing), len(hashmap['hashes']) + 1):
367 sendlog.info('%s blocks missing' % len(missing))
368 num_of_blocks = len(missing)
369 missing = self._upload_missing_blocks(
375 if num_of_blocks == len(missing):
378 num_of_blocks = len(missing)
383 '%s blocks failed to upload' % len(missing),
385 except KeyboardInterrupt:
386 sendlog.info('- - - wait for threads to finish')
387 for thread in activethreads():
395 content_type=content_type,
401 # download_* auxiliary methods
402 def _get_remote_blocks_info(self, obj, **restargs):
403 #retrieve object hashmap
404 myrange = restargs.pop('data_range', None)
405 hashmap = self.get_object_hashmap(obj, **restargs)
406 restargs['data_range'] = myrange
407 blocksize = int(hashmap['block_size'])
408 blockhash = hashmap['block_hash']
409 total_size = hashmap['bytes']
410 #assert total_size/blocksize + 1 == len(hashmap['hashes'])
412 for i, h in enumerate(hashmap['hashes']):
413 # map_dict[h] = i CHAGE
415 map_dict[h].append(i)
418 return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
420 def _dump_blocks_sync(
421 self, obj, remote_hashes, blocksize, total_size, dst, range,
423 for blockid, blockhash in enumerate(remote_hashes):
425 start = blocksize * blockid
426 is_last = start + blocksize > total_size
427 end = (total_size - 1) if is_last else (start + blocksize - 1)
428 (start, end) = _range_up(start, end, range)
429 args['data_range'] = 'bytes=%s-%s' % (start, end)
430 r = self.object_get(obj, success=(200, 206), **args)
435 def _get_block_async(self, obj, **args):
436 event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
440 def _hash_from_file(self, fp, start, size, blockhash):
442 block = fp.read(size)
443 h = newhashlib(blockhash)
444 h.update(block.strip('\x00'))
445 return hexlify(h.digest())
447 def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
448 """write the results of a greenleted rest call to a file
450 :param offset: the offset of the file up to blocksize
451 - e.g. if the range is 10-100, all blocks will be written to
454 for i, (key, g) in enumerate(flying.items()):
459 block = g.value.content
460 for block_start in blockids[key]:
461 local_file.seek(block_start + offset)
462 local_file.write(block)
468 def _dump_blocks_async(
469 self, obj, remote_hashes, blocksize, total_size, local_file,
470 blockhash=None, resume=False, filerange=None, **restargs):
471 file_size = fstat(local_file.fileno()).st_size if resume else 0
473 blockid_dict = dict()
475 if filerange is not None:
476 rstart = int(filerange.split('-')[0])
477 offset = rstart if blocksize > rstart else rstart % blocksize
479 self._init_thread_limit()
480 for block_hash, blockids in remote_hashes.items():
481 blockids = [blk * blocksize for blk in blockids]
482 unsaved = [blk for blk in blockids if not (
483 blk < file_size and block_hash == self._hash_from_file(
484 local_file, blk, blocksize, blockhash))]
485 self._cb_next(len(blockids) - len(unsaved))
488 self._watch_thread_limit(flying.values())
490 flying, blockid_dict, local_file, offset,
492 end = total_size - 1 if key + blocksize > total_size\
493 else key + blocksize - 1
494 start, end = _range_up(key, end, filerange)
498 restargs['async_headers'] = {
499 'Range': 'bytes=%s-%s' % (start, end)}
500 flying[key] = self._get_block_async(obj, **restargs)
501 blockid_dict[key] = unsaved
503 for thread in flying.values():
505 self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
515 if_modified_since=None,
516 if_unmodified_since=None):
517 """Download an object (multiple connections, random blocks)
519 :param obj: (str) remote object path
521 :param dst: open file descriptor (wb+)
523 :param download_cb: optional progress.bar object for downloading
525 :param version: (str) file version
527 :param resume: (bool) if set, preserve already downloaded file parts
529 :param range_str: (str) from, to are file positions (int) in bytes
531 :param if_match: (str)
533 :param if_none_match: (str)
535 :param if_modified_since: (str) formated date
537 :param if_unmodified_since: (str) formated date"""
540 data_range=None if range_str is None else 'bytes=%s' % range_str,
542 if_none_match=if_none_match,
543 if_modified_since=if_modified_since,
544 if_unmodified_since=if_unmodified_since)
551 remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
552 assert total_size >= 0
555 self.progress_bar_gen = download_cb(len(hash_list))
559 self._dump_blocks_sync(
568 self._dump_blocks_async(
579 dst.truncate(total_size)
583 #Command Progress Bar method
584 def _cb_next(self, step=1):
585 if hasattr(self, 'progress_bar_gen'):
587 for i in xrange(step):
588 self.progress_bar_gen.next()
592 def _complete_cb(self):
595 self.progress_bar_gen.next()
599 def get_object_hashmap(
604 if_modified_since=None,
605 if_unmodified_since=None,
608 :param obj: (str) remote object path
610 :param if_match: (str)
612 :param if_none_match: (str)
614 :param if_modified_since: (str) formated date
616 :param if_unmodified_since: (str) formated date
618 :param data_range: (str) from-to where from and to are integers
619 denoting file positions in bytes
628 if_etag_match=if_match,
629 if_etag_not_match=if_none_match,
630 if_modified_since=if_modified_since,
631 if_unmodified_since=if_unmodified_since,
632 data_range=data_range)
633 except ClientError as err:
634 if err.status == 304 or err.status == 412:
639 def set_account_group(self, group, usernames):
643 :param usernames: (list)
645 self.account_post(update=True, groups={group: usernames})
647 def del_account_group(self, group):
651 self.account_post(update=True, groups={group: []})
653 def get_account_info(self, until=None):
655 :param until: (str) formated date
659 r = self.account_head(until=until)
660 if r.status_code == 401:
661 raise ClientError("No authorization", status=401)
664 def get_account_quota(self):
669 self.get_account_info(),
670 'X-Account-Policy-Quota',
673 def get_account_versioning(self):
678 self.get_account_info(),
679 'X-Account-Policy-Versioning',
682 def get_account_meta(self, until=None):
684 :meta until: (str) formated date
688 return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
690 def get_account_group(self):
694 return filter_in(self.get_account_info(), 'X-Account-Group-')
696 def set_account_meta(self, metapairs):
698 :param metapairs: (dict) {key1:val1, key2:val2, ...}
700 assert(type(metapairs) is dict)
701 self.account_post(update=True, metadata=metapairs)
703 def del_account_meta(self, metakey):
705 :param metakey: (str) metadatum key
707 self.account_post(update=True, metadata={metakey: ''})
709 def set_account_quota(self, quota):
713 self.account_post(update=True, quota=quota)
715 def set_account_versioning(self, versioning):
717 "param versioning: (str)
719 self.account_post(update=True, versioning=versioning)
721 def list_containers(self):
725 r = self.account_get()
728 def del_container(self, until=None, delimiter=None):
730 :param until: (str) formated date
732 :param delimiter: (str) with / empty container
734 :raises ClientError: 404 Container does not exist
736 :raises ClientError: 409 Container is not empty
738 self._assert_container()
739 r = self.container_delete(
742 success=(204, 404, 409))
743 if r.status_code == 404:
745 'Container "%s" does not exist' % self.container,
747 elif r.status_code == 409:
749 'Container "%s" is not empty' % self.container,
752 def get_container_versioning(self, container=None):
754 :param container: (str)
758 cnt_back_up = self.container
760 self.container = container or cnt_back_up
762 self.get_container_info(),
763 'X-Container-Policy-Versioning')
765 self.container = cnt_back_up
767 def get_container_quota(self, container=None):
769 :param container: (str)
773 cnt_back_up = self.container
775 self.container = container or cnt_back_up
777 self.get_container_info(),
778 'X-Container-Policy-Quota')
780 self.container = cnt_back_up
782 def get_container_info(self, until=None):
784 :param until: (str) formated date
788 :raises ClientError: 404 Container not found
791 r = self.container_head(until=until)
792 except ClientError as err:
793 err.details.append('for container %s' % self.container)
797 def get_container_meta(self, until=None):
799 :param until: (str) formated date
804 self.get_container_info(until=until),
807 def get_container_object_meta(self, until=None):
809 :param until: (str) formated date
814 self.get_container_info(until=until),
815 'X-Container-Object-Meta')
817 def set_container_meta(self, metapairs):
819 :param metapairs: (dict) {key1:val1, key2:val2, ...}
821 assert(type(metapairs) is dict)
822 self.container_post(update=True, metadata=metapairs)
824 def del_container_meta(self, metakey):
826 :param metakey: (str) metadatum key
828 self.container_post(update=True, metadata={metakey: ''})
830 def set_container_quota(self, quota):
834 self.container_post(update=True, quota=quota)
836 def set_container_versioning(self, versioning):
838 :param versioning: (str)
840 self.container_post(update=True, versioning=versioning)
842 def del_object(self, obj, until=None, delimiter=None):
844 :param obj: (str) remote object path
846 :param until: (str) formated date
848 :param delimiter: (str)
850 self._assert_container()
851 self.object_delete(obj, until=until, delimiter=delimiter)
853 def set_object_meta(self, obj, metapairs):
855 :param obj: (str) remote object path
857 :param metapairs: (dict) {key1:val1, key2:val2, ...}
859 assert(type(metapairs) is dict)
860 self.object_post(obj, update=True, metadata=metapairs)
862 def del_object_meta(self, obj, metakey):
864 :param obj: (str) remote object path
866 :param metakey: (str) metadatum key
868 self.object_post(obj, update=True, metadata={metakey: ''})
870 def publish_object(self, obj):
872 :param obj: (str) remote object path
874 :returns: (str) access url
876 self.object_post(obj, update=True, public=True)
877 info = self.get_object_info(obj)
878 pref, sep, rest = self.base_url.partition('//')
879 base = rest.split('/')[0]
880 return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
882 def unpublish_object(self, obj):
884 :param obj: (str) remote object path
886 self.object_post(obj, update=True, public=False)
888 def get_object_info(self, obj, version=None):
890 :param obj: (str) remote object path
892 :param version: (str)
897 r = self.object_head(obj, version=version)
899 except ClientError as ce:
901 raise ClientError('Object %s not found' % obj, status=404)
904 def get_object_meta(self, obj, version=None):
906 :param obj: (str) remote object path
908 :param version: (str)
913 self.get_object_info(obj, version=version),
916 def get_object_sharing(self, obj):
918 :param obj: (str) remote object path
923 self.get_object_info(obj),
928 perms = r['x-object-sharing'].split(';')
933 raise ClientError('Incorrect reply format')
934 (key, val) = perm.strip().split('=')
938 def set_object_sharing(
940 read_permition=False, write_permition=False):
941 """Give read/write permisions to an object.
943 :param obj: (str) remote object path
945 :param read_permition: (list - bool) users and user groups that get
946 read permition for this object - False means all previous read
947 permissions will be removed
949 :param write_perimition: (list - bool) of users and user groups to get
950 write permition for this object - False means all previous write
951 permissions will be removed
954 perms = dict(read=read_permition or '', write=write_permition or '')
955 self.object_post(obj, update=True, permissions=perms)
957 def del_object_sharing(self, obj):
959 :param obj: (str) remote object path
961 self.set_object_sharing(obj)
963 def append_object(self, obj, source_file, upload_cb=None):
965 :param obj: (str) remote object path
967 :param source_file: open file descriptor
969 :param upload_db: progress.bar for uploading
972 self._assert_container()
973 meta = self.get_container_info()
974 blocksize = int(meta['x-container-block-size'])
975 filesize = fstat(source_file.fileno()).st_size
976 nblocks = 1 + (filesize - 1) // blocksize
979 upload_gen = upload_cb(nblocks)
981 for i in range(nblocks):
982 block = source_file.read(min(blocksize, filesize - offset))
987 content_range='bytes */*',
988 content_type='application/octet-stream',
989 content_length=len(block),
995 def truncate_object(self, obj, upto_bytes):
997 :param obj: (str) remote object path
999 :param upto_bytes: max number of bytes to leave on file
1004 content_range='bytes 0-%s/*' % upto_bytes,
1005 content_type='application/octet-stream',
1006 object_bytes=upto_bytes,
1007 source_object=path4url(self.container, obj))
1009 def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1010 """Overwrite a part of an object from local source file
1012 :param obj: (str) remote object path
1014 :param start: (int) position in bytes to start overwriting from
1016 :param end: (int) position in bytes to stop overwriting at
1018 :param source_file: open file descriptor
1020 :param upload_db: progress.bar for uploading
1023 r = self.get_object_info(obj)
1024 rf_size = int(r['content-length'])
1025 if rf_size < int(start):
1027 'Range start exceeds file size',
1029 elif rf_size < int(end):
1031 'Range end exceeds file size',
1033 self._assert_container()
1034 meta = self.get_container_info()
1035 blocksize = int(meta['x-container-block-size'])
1036 filesize = fstat(source_file.fileno()).st_size
1037 datasize = int(end) - int(start) + 1
1038 nblocks = 1 + (datasize - 1) // blocksize
1041 upload_gen = upload_cb(nblocks)
1043 for i in range(nblocks):
1044 read_size = min(blocksize, filesize - offset, datasize - offset)
1045 block = source_file.read(read_size)
1049 content_type='application/octet-stream',
1050 content_length=len(block),
1051 content_range='bytes %s-%s/*' % (
1053 start + offset + len(block) - 1),
1055 offset += len(block)
1061 self, src_container, src_object, dst_container,
1063 source_version=None,
1064 source_account=None,
1069 :param src_container: (str) source container
1071 :param src_object: (str) source object path
1073 :param dst_container: (str) destination container
1075 :param dst_object: (str) destination object path
1077 :param source_version: (str) source object version
1079 :param source_account: (str) account to copy from
1081 :param public: (bool)
1083 :param content_type: (str)
1085 :param delimiter: (str)
1087 self._assert_account()
1088 self.container = dst_container
1089 src_path = path4url(src_container, src_object)
1091 dst_object or src_object,
1095 source_version=source_version,
1096 source_account=source_account,
1098 content_type=content_type,
1099 delimiter=delimiter)
1102 self, src_container, src_object, dst_container,
1104 source_account=None,
1105 source_version=None,
1110 :param src_container: (str) source container
1112 :param src_object: (str) source object path
1114 :param dst_container: (str) destination container
1116 :param dst_object: (str) destination object path
1118 :param source_account: (str) account to move from
1120 :param source_version: (str) source object version
1122 :param public: (bool)
1124 :param content_type: (str)
1126 :param delimiter: (str)
1128 self._assert_account()
1129 self.container = dst_container
1130 dst_object = dst_object or src_object
1131 src_path = path4url(src_container, src_object)
1137 source_account=source_account,
1138 source_version=source_version,
1140 content_type=content_type,
1141 delimiter=delimiter)
1143 def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1144 """Get accounts that share with self.account
1148 :param marker: (str)
1152 self._assert_account()
1154 self.set_param('format', 'json')
1155 self.set_param('limit', limit, iff=limit is not None)
1156 self.set_param('marker', marker, iff=marker is not None)
1159 success = kwargs.pop('success', (200, 204))
1160 r = self.get(path, *args, success=success, **kwargs)
1163 def get_object_versionlist(self, obj):
1165 :param obj: (str) remote object path
1169 self._assert_container()
1170 r = self.object_get(obj, format='json', version='list')
1171 return r.json['versions']